X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fxapp_utils.cc;fp=src%2Fxapp_utils.cc;h=68394bcab5935d84510e46cb07bab410c2418fc1;hb=4e545a8b013e60f2ff59254cb3fe435012d8fe5a;hp=8130a1ddc115a02773a51024194d5bb1d9286717;hpb=82ba4b9999ca8e09461315a919b36a66641a6c7d;p=ric-app%2Fadmin.git diff --git a/src/xapp_utils.cc b/src/xapp_utils.cc index 8130a1d..68394bc 100644 --- a/src/xapp_utils.cc +++ b/src/xapp_utils.cc @@ -24,21 +24,8 @@ #include "xapp_utils.hpp" -// Constructor that automatically determines number of threads -XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_attempts(0), _num_fails(0){ - - _num_threads = std::thread::hardware_concurrency(); - strcpy(_xapp_name, xapp_name); - strcpy(_proto_port, proto_port); - init(_msg_size); - get_routes(); - -}; - - - -// Constructor that takes number of threads as argument -XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size, int num_threads): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_threads(num_threads),_num_attempts(0), _num_fails(0) { +// Constructor +XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false), _msg_size(msg_size) { strcpy(_xapp_name, xapp_name); strcpy(_proto_port, proto_port); @@ -59,6 +46,7 @@ XaPP::~XaPP(void){ if (_rmr_ctx){ rmr_close(_rmr_ctx); } + // delete mutex delete _transmit; }; @@ -75,13 +63,8 @@ void XaPP::init(int msg_size){ } // Initialze the rmr context - if ( (_rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE)) == NULL){ - // throw exception here .. - std::stringstream ss; - ss << "Error ::" << __FILE__ << "," << __LINE__ << " Error initiatilizing RMR context for " << _xapp_name << " on port " << _proto_port << " Reason = " << strerror(errno) << std::endl; - mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); - throw ss.str(); - } + _rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE); + assert(_rmr_ctx != NULL); } @@ -109,75 +92,73 @@ void XaPP::get_routes(void){ // Get a tx buffer in case we need to do a transmit from the main thread itself - if ( (_rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE)) == NULL){ - // throw exception here .. - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " Error getting a send buffer"; - throw std::runtime_error(error_string); - } - + _rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE); + assert(_rmr_tx_message != NULL); std::cout <<"Route Table received. Send buffer allocated" << std::endl; _transmit = new std::mutex(); } // Send method that takes TLV (type/length/value) input -bool XaPP::Send(int type, int payload_len, void *payload){ - - if (likely(_is_ready)){ - if (likely(payload_len <= RMR_BUFFER_SIZE)){ - _rmr_tx_message->mtype = type; - memcpy(_rmr_tx_message->payload, payload, payload_len); - _rmr_tx_message->len = payload_len; - return Send(_rmr_tx_message); - } - else{ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " message payload len " + std::to_string(payload_len) + " exceeds maximum buffer size " + std::to_string(RMR_BUFFER_SIZE); - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - } +bool XaPP::Send(int type, size_t payload_len, void *payload, link_types mode, tx_types send_type){ + + if (likely(payload_len <= RMR_BUFFER_SIZE)){ + _rmr_tx_message->mtype = type; + memcpy(_rmr_tx_message->payload, payload, payload_len); + _rmr_tx_message->len = payload_len; + return Send(_rmr_tx_message, mode, send_type); } else{ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " rmr not ready to send"; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); + std::stringstream ss; + ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); } - + return false; } // Send method that takes TLV (type/length/value) input + MEID -bool XaPP::Send(int type, int payload_len, void *payload, unsigned char const * meid){ - if (!_is_ready){ - return false; +bool XaPP::Send(int type, size_t payload_len, void *payload, unsigned char const * meid, link_types mode, tx_types send_type){ + if (likely(payload_len <= RMR_BUFFER_SIZE)){ + _rmr_tx_message->mtype = type; + memcpy(_rmr_tx_message->payload, payload, payload_len); + _rmr_tx_message->len = payload_len; + rmr_str2meid(_rmr_tx_message, meid); + return Send(_rmr_tx_message, mode, send_type); + } + else{ + std::stringstream ss; + ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); } - _rmr_tx_message->mtype = type; - memcpy(_rmr_tx_message->payload, payload, payload_len); - _rmr_tx_message->len = payload_len; - rmr_str2meid(_rmr_tx_message, meid); - return Send(_rmr_tx_message); - + return false; } // Send method that takes a buffer -bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){ +bool XaPP::Send(rmr_mbuf_t * rmr_tx_message, link_types mode, tx_types send_type){ if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE && rmr_tx_message->len > 0)){ - int i = 0; + unsigned int i = 0; rmr_tx_message->sub_id = RMR_VOID_SUBID; - while(i <= _num_retries){ + + while(i <= link_retries[mode]){ //rmr_tx_message->state = 0; // fix for nng - rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message); - _num_attempts ++; + // how to send + if(likely(send_type == ROUTE)){ + rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message); + } + else{ + rmr_tx_message = rmr_rts_msg(_rmr_ctx, rmr_tx_message); + } if (! rmr_tx_message){ - // CRITICAL EROR .. log it - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " rmr_send returned NULL"; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); + // CRITICAL EROR .. log it and return + std::stringstream ss; + ss << __FILE__ << "," << __LINE__ << " RMR send function returned NULL. Reason = " << strerror(errno) << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); return false; } else if (rmr_tx_message->state == RMR_OK){ @@ -185,7 +166,7 @@ bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){ } else if(rmr_tx_message->state == RMR_ERR_RETRY){ i++; - _num_fails++; + std::this_thread::sleep_for(std::chrono::milliseconds(link_delays[mode])); } else { mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno)); @@ -240,11 +221,11 @@ void XaPP::_error_handler(rmr_mbuf_t *message){ // Some get/set methods //--------------------------------------- -std::string XaPP::getName(void){ +std::string XaPP::get_name(void){ return std::string(_xapp_name); } -int XaPP::getStatus(void){ +int XaPP::get_status(void){ return _is_ready; } @@ -257,30 +238,6 @@ void * XaPP::get_rmr_context(void){ return _rmr_ctx; } -void XaPP::set_num_retries(int num_retries){ - if (num_retries < 0 || num_retries > MAX_RETRIES){ - throw "[xapp_utils] : Illegal value of num_retries. Must be positive integer between 0 and MAX_RETRIES\n"; - } - - _num_retries = num_retries; -} - -int XaPP::get_num_retries(void){ - return _num_retries; -} - - -unsigned long XaPP::get_Send_attempts(void){ - return _num_attempts; -}; - - -unsigned long XaPP::get_Send_fails(void){ - return _num_fails; -}; - - - void init_logger(const char *AppName, mdclog_severity_t log_level) {