/* ================================================================================== Copyright (c) 2018-2019 SAMSUNG and AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ================================================================================== */ #include "xapp_utils.hpp" // Constructor that automatically determines number of threads XaPP::XaPP(char *xapp_name, char *proto_port, int redis_port, int msg_size): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_attempts(0), _num_fails(0){ _num_threads = std::thread::hardware_concurrency(); strcpy(_xapp_name, xapp_name); strcpy(_proto_port, proto_port); _redis_port = redis_port; init(_msg_size); get_routes(); redisInit(); }; // Constructor that takes number of threads as argument XaPP::XaPP(char *xapp_name, char *proto_port, int redis_port, int msg_size, int num_threads): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_threads(num_threads),_num_attempts(0), _num_fails(0) { strcpy(_xapp_name, xapp_name); strcpy(_proto_port, proto_port); _redis_port = redis_port; init(_msg_size); get_routes(); redisInit(); }; // Destructor closes rmr context if available XaPP::~XaPP(void){ // Call stop to clear thread table Stop(); // free memory rmr_free_msg(_rmr_tx_message); if (_rmr_ctx){ rmr_close(_rmr_ctx); } // delete mutex delete _transmit; /* Disconnects and frees the context */ redisFree(c); }; // Get the RMR context void XaPP::init(int msg_size){ if (msg_size > RMR_BUFFER_SIZE or msg_size <= 0){ std::stringstream ss; ss << "Error ::" << __FILE__ << "," << __LINE__ << " Invalid buffer size " << msg_size << " for RMR initialization context. Must be between " << 1 << " and " << RMR_BUFFER_SIZE << std::endl; ss << " To change max buffer requested, update RMR_BUFFER_SIZE " << std::endl; mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); throw ss.str(); } // Initialze the rmr context if ( (_rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE)) == NULL){ // throw exception here .. std::stringstream ss; ss << "Error ::" << __FILE__ << "," << __LINE__ << " Error initiatilizing RMR context for " << _xapp_name << " on port " << _proto_port << " Reason = " << strerror(errno) << std::endl; mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); throw ss.str(); } } // Get the Redis Context void XaPP::redisInit(){ // Initialize the Redis Context c = redisConnect("127.0.0.1", _redis_port); if (c == NULL || c->err) { std::stringstream ss; if (c) { ss << "Error: " << c->errstr; // handle error } else { ss <<"Can't allocate redis context"; } mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); throw ss.str(); } } // Blocks till routing table is received void XaPP::get_routes(void){ // Wait for getting a routing table ... int i = 0; _is_ready = 0; while( i < MAX_WAIT_TIME){ std::cout <<"Waiting for RMR to be ready " << std::endl; if ((_is_ready = rmr_ready(_rmr_ctx)) == 1){ break; } sleep(1); i++; }; if(!_is_ready){ std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; std::string error_string = identifier + " Error getting routing table"; throw std::runtime_error(error_string); } // Get a tx buffer in case we need to do a transmit from the main thread itself if ( (_rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE)) == NULL){ // throw exception here .. std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; std::string error_string = identifier + " Error getting a send buffer"; throw std::runtime_error(error_string); } std::cout <<"Route Table received. Send buffer allocated" << std::endl; _transmit = new std::mutex(); } // Send method that takes TLV (type/length/value) input bool XaPP::Send(int type, int payload_len, void *payload){ if (likely(_is_ready)){ if (likely(payload_len <= RMR_BUFFER_SIZE)){ _rmr_tx_message->mtype = type; memcpy(_rmr_tx_message->payload, payload, payload_len); _rmr_tx_message->len = payload_len; return Send(_rmr_tx_message); } else{ std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; std::string error_string = identifier + " message payload len " + std::to_string(payload_len) + " exceeds maximum buffer size " + std::to_string(RMR_BUFFER_SIZE); mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); } } else{ std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; std::string error_string = identifier + " rmr not ready to send"; mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); } return false; } // Send method that takes TLV (type/length/value) input + MEID bool XaPP::Send(int type, int payload_len, void *payload, unsigned char const * meid){ if (!_is_ready){ return false; } _rmr_tx_message->mtype = type; memcpy(_rmr_tx_message->payload, payload, payload_len); _rmr_tx_message->len = payload_len; rmr_str2meid(_rmr_tx_message, meid); return Send(_rmr_tx_message); } // Send method that takes a buffer bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){ if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE && rmr_tx_message->len > 0)){ int i = 0; rmr_tx_message->sub_id = RMR_VOID_SUBID; while(i <= _num_retries){ //rmr_tx_message->state = 0; // fix for nng rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message); _num_attempts ++; if (! rmr_tx_message){ // CRITICAL EROR .. log it std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; std::string error_string = identifier + " rmr_send returned NULL"; mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); return false; } else if (rmr_tx_message->state == RMR_OK){ return true; } else if(rmr_tx_message->state == RMR_ERR_RETRY){ i++; _num_fails++; } else { mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno)); return false; } } } else{ mdclog_write(MDCLOG_ERR, "Error: %s, %d Invalid state/message for RMR tx. Ready = %d, Message len = %d\n", __FILE__, __LINE__, _is_ready, rmr_tx_message->len); return false; } std::stringstream ss; ss << "Error ::" << __FILE__ << "," << __LINE__ << " Could not send message of type " << rmr_tx_message->mtype << " and size " << rmr_tx_message->len << ". Reason = " << strerror(errno); mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); return false; } void XaPP::Stop(void){ // Get the mutex lock std::lock_guard guard(*_transmit); _listen = false; //Wait for all threads to stop for(auto & t: thread_group){ std::thread::id my_id = t.second.get_id(); std::stringstream thread_id; thread_id << my_id; t.second.join(); } // Clear thread table ... thread_group.clear(); } // default error handler if non specified by user // pass through for now void XaPP::_error_handler(rmr_mbuf_t *message){ }; //---------------------------------------- // Some get/set methods //--------------------------------------- std::string XaPP::getName(void){ return std::string(_xapp_name); } int XaPP::getStatus(void){ return _is_ready; } bool XaPP::_isRunning(void){ return _listen; } void * XaPP::get_rmr_context(void){ return _rmr_ctx; } void XaPP::set_num_retries(int num_retries){ if (num_retries < 0 || num_retries > MAX_RETRIES){ throw "[xapp_utils] : Illegal value of num_retries. Must be positive integer between 0 and MAX_RETRIES\n"; } _num_retries = num_retries; } int XaPP::get_num_retries(void){ return _num_retries; } unsigned long XaPP::get_Send_attempts(void){ return _num_attempts; }; unsigned long XaPP::get_Send_fails(void){ return _num_fails; }; // Initialization of Log level void init_logger(const char *AppName, mdclog_severity_t log_level) { mdclog_attr_t *attr; mdclog_attr_init(&attr); mdclog_attr_set_ident(attr, AppName); mdclog_init(attr); mdclog_level_set(log_level); mdclog_attr_destroy(attr); }