X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fxapp_utils.hpp;fp=src%2Fxapp_utils.hpp;h=4537ebc91382102db34d72c5c9d5d9d55d6a4323;hb=4e545a8b013e60f2ff59254cb3fe435012d8fe5a;hp=fd9843c10c23320d4430ca0f347a6f860d5ea815;hpb=82ba4b9999ca8e09461315a919b36a66641a6c7d;p=ric-app%2Fadmin.git diff --git a/src/xapp_utils.hpp b/src/xapp_utils.hpp index fd9843c..4537ebc 100644 --- a/src/xapp_utils.hpp +++ b/src/xapp_utils.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -36,7 +37,6 @@ #ifndef XAPP_UTIL # define XAPP_UTIL -#define DEBUG 0 #define XAPP_NAME_LENGTH 128 #define PROTO_PORT_LENGTH 16 @@ -55,6 +55,15 @@ #define unlikely(x) (x) #endif + +// define RMR Send behaviour for different link-types +// controls how often we try and delay between tries, as well as method + +enum link_types {LOW_LATENCY, HIGH_RELIABILITY}; +static const unsigned int link_delays[] = {1, 10}; // milli-seconds to wait before retries +static const unsigned int link_retries[] = {4, 15}; // number of times to retry +enum tx_types {ROUTE, RTS}; // regular rmr or rts + void init_logger(const char *AppName, mdclog_severity_t log_level); @@ -63,46 +72,37 @@ class XaPP { public: XaPP(char *, char *, int); - XaPP(char *, char *, int, int); ~XaPP(void); XaPP(XaPP &&) = default; // destructor freaks out with non-copyable thread otherwise .. - std::string getName(void); - int getStatus(void); + std::string get_name(void); + + int get_status(void); + + size_t get_num_active_threads(void) const { return thread_group.size(); }; + // ideally can reduce tempate definitions to just two // but for now leaving it open ... - // template definition to allow a user defined - // processor to be started in multiple threads - template - void Start(messageProcessor &&); - - // template definition to allow a user defined - // processor and error handler if a send fails - // to be started in multiple threads - template - void Start(messageProcessor &&, errorHandler &&); - // Template to allow a user defined processor to start - // on a single thread each time it is invoked + // on a thread template unsigned int StartThread(messageProcessor &&); - // Template to allow a user defined processor and + // Template to allow a user defined processor AND // error handle to start // on a single thread each time it // is invoked template unsigned int StartThread(messageProcessor &&, errorHandler &&); void Stop(void); - bool Send(int type, int payload_len, void *payload); - bool Send(int type, int payload_len, void *payload, unsigned char const *meid); - bool Send(rmr_mbuf_t * rmr_tx_message); + + // various flavours of send : first two finally call the last + bool Send(int type, size_t payload_len, void *payload, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE); + bool Send(int type, size_t payload_len, void *payload, unsigned char const *meid, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE); + bool Send(rmr_mbuf_t * rmr_tx_message, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE); + void * get_rmr_context(void); - void set_num_retries(int ); - int get_num_retries(void ); - unsigned long get_Send_attempts(void); - unsigned long get_Send_fails(void); private: @@ -119,10 +119,9 @@ private: int _is_ready; bool _listen; int _num_retries; + int _retry_interval; int _msg_size; unsigned int _num_threads; - unsigned long _num_attempts; - unsigned long _num_fails; void* _rmr_ctx; std::mutex *_transmit; @@ -133,7 +132,7 @@ private: }; - +// main workhorse thread which does the listen->process->respond loop template void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){ @@ -141,6 +140,8 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler // Get the thread id std::thread::id my_id = std::this_thread::get_id(); std::stringstream thread_id; + std::stringstream ss; + thread_id << my_id; // Stats counters @@ -148,65 +149,40 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler unsigned long attempts = 0; unsigned long fails = 0; - // Get the rmr context + // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe) void *rmr_context = parent->get_rmr_context(); - if (!rmr_context){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " Thread : " + thread_id.str() + " Listener cannot run : no context available"; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; - } - + assert(rmr_context != NULL); + // Get buffer specific to this thread - rmr_mbuf_t *rmr_message; - if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){ - // throw exception here .. - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; - } - + rmr_mbuf_t *rmr_message = NULL; + rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE); + assert(rmr_message != NULL); // Create an epoll instance int rcv_fd, ep_fd; struct epoll_event eve, trigger; - if( (rcv_fd = rmr_get_rcvfd(rmr_context)) < 0){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a receive file descriptor : " + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; - } - - if( (ep_fd = epoll_create1(0) ) < 0){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting an epoll file descriptor :" + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; - } - + rcv_fd = rmr_get_rcvfd(rmr_context); + assert(rcv_fd > 0); + + ep_fd = epoll_create1(0); + assert(ep_fd > 0); + trigger.events = EPOLLIN|EPOLLET|EPOLLONESHOT; trigger.data.fd = rcv_fd; if (epoll_ctl (ep_fd, EPOLL_CTL_ADD, rcv_fd, &trigger) < 0){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error registering epoll file descriptor : " + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; + ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error registering epoll file descriptor" << " Reason = " << strerror(errno) << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); + throw std::runtime_error(ss.str()); } - int num_retries = this->get_num_retries(); - int i = 0; int num_fds = 0; bool send_ok; mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str()); - + + // the workhorse loop while(parent->_isRunning()){ num_fds = epoll_wait(ep_fd, &eve, 1, EPOLL_TIMEOUT); @@ -216,11 +192,9 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler // Re-arm the trigger if (epoll_ctl (ep_fd, EPOLL_CTL_MOD, rcv_fd, &trigger) < 0){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error re-arming epoll : " + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; + ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error re-arming epoll" << " Reason = " << strerror(errno) << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); + throw std::runtime_error(ss.str()); } } @@ -233,54 +207,25 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler recvs++; bool res = msg_fn(rmr_message); - // is there anything to send + // is there anything to send ? if (res && rmr_message != NULL && likely (rmr_message->len > 0 && rmr_message->len <= RMR_BUFFER_SIZE)){ - i = 0; - rmr_message->sub_id = RMR_VOID_SUBID; + + rmr_message->sub_id = RMR_VOID_SUBID; // do we change this ? send_ok = false; - while(i < num_retries){ - - // Need to handle differently depending on whether message - // is for A1 (determined by type) or non-A1. - // For now, A1 requires we bypass the routing table and send - // directly back to originator using rmr_rts_msg rather than - // over the bus - - if (unlikely(rmr_message->mtype == DC_ADM_INT_CONTROL_ACK || rmr_message->mtype == DC_ADM_GET_POLICY_ACK)){ - rmr_message = rmr_rts_msg(rmr_context, rmr_message); - } - else{ - rmr_message->state = 0; // fix for nng - rmr_message = rmr_send_msg(rmr_context, rmr_message); - } - attempts ++; - - if (! rmr_message){ - // CRITICAL error. break out of loop - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " rmr_send returned NULL"; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - break; - } - else if (rmr_message->state == RMR_OK){ - send_ok = true; - break; - } - - else if(rmr_message->state == RMR_ERR_RETRY){ - i++; - fails++; - } - else{ - mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_message->state, strerror(errno)); - break; - } - + if (unlikely(rmr_message->mtype == A1_POLICY_RESP)){ + // for a1 messages we use send in high reliability mode and RTS + send_ok = Send(rmr_message, HIGH_RELIABILITY, RTS); + } + else{ + rmr_message->state = 0; // fix for nng + send_ok = Send(rmr_message); } - + attempts ++; + if (send_ok == false){ error_handler(rmr_message); + fails ++; } } @@ -302,37 +247,6 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler mdclog_write(MDCLOG_INFO, "Finished thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails); } - -template -void XaPP::Start(messageProcessor && msg_fn){ - - std::lock_guard guard(*_transmit); - _listen = true; - - // Spin up the the workThreads ..... - for(unsigned int i = 0; i < _num_threads; i++){ - thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);})))); - } - -}; - -// template if calling function provides an error handler also -template -void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){ - - std::lock_guard guard(*_transmit); - _listen = true; - - // Spin up the the workThreads ..... - for(unsigned int i = 0; i < _num_threads; i++){ - //std::cout << "Starting thread number " << i << std::endl; - thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);})))); - - } - - -}; - // Template to allow a user defined processor to start // on a specific thread template