#include <unistd.h>
#include <string.h>
#include <error.h>
+#include <assert.h>
#include <thread>
#include <map>
#include <mutex>
#ifndef XAPP_UTIL
# define XAPP_UTIL
-#define DEBUG 0
#define XAPP_NAME_LENGTH 128
#define PROTO_PORT_LENGTH 16
#define unlikely(x) (x)
#endif
+
+// define RMR Send behaviour for different link-types
+// controls how often we try and delay between tries, as well as method
+
+enum link_types {LOW_LATENCY, HIGH_RELIABILITY};
+static const unsigned int link_delays[] = {1, 10}; // milli-seconds to wait before retries
+static const unsigned int link_retries[] = {4, 15}; // number of times to retry
+enum tx_types {ROUTE, RTS}; // regular rmr or rts
+
void init_logger(const char *AppName, mdclog_severity_t log_level);
public:
XaPP(char *, char *, int);
- XaPP(char *, char *, int, int);
~XaPP(void);
XaPP(XaPP &&) = default; // destructor freaks out with non-copyable thread otherwise ..
- std::string getName(void);
- int getStatus(void);
+ std::string get_name(void);
+
+ int get_status(void);
+
+ size_t get_num_active_threads(void) const { return thread_group.size(); };
+
// ideally can reduce tempate definitions to just two
// but for now leaving it open ...
- // template definition to allow a user defined
- // processor to be started in multiple threads
- template <typename messageProcessor>
- void Start(messageProcessor &&);
-
- // template definition to allow a user defined
- // processor and error handler if a send fails
- // to be started in multiple threads
- template <typename messageProcessor, typename errorHandler>
- void Start(messageProcessor &&, errorHandler &&);
-
// Template to allow a user defined processor to start
- // on a single thread each time it is invoked
+ // on a thread
template <typename messageProcessor >
unsigned int StartThread(messageProcessor &&);
- // Template to allow a user defined processor and
+ // Template to allow a user defined processor AND
// error handle to start // on a single thread each time it
// is invoked
template <typename messageProcessor , typename errorHandler>
unsigned int StartThread(messageProcessor &&, errorHandler &&);
void Stop(void);
- bool Send(int type, int payload_len, void *payload);
- bool Send(int type, int payload_len, void *payload, unsigned char const *meid);
- bool Send(rmr_mbuf_t * rmr_tx_message);
+
+ // various flavours of send : first two finally call the last
+ bool Send(int type, size_t payload_len, void *payload, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
+ bool Send(int type, size_t payload_len, void *payload, unsigned char const *meid, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
+ bool Send(rmr_mbuf_t * rmr_tx_message, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
+
void * get_rmr_context(void);
- void set_num_retries(int );
- int get_num_retries(void );
- unsigned long get_Send_attempts(void);
- unsigned long get_Send_fails(void);
private:
int _is_ready;
bool _listen;
int _num_retries;
+ int _retry_interval;
int _msg_size;
unsigned int _num_threads;
- unsigned long _num_attempts;
- unsigned long _num_fails;
void* _rmr_ctx;
std::mutex *_transmit;
};
-
+// main workhorse thread which does the listen->process->respond loop
template <typename messageProcessor, typename errorHandler>
void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){
// Get the thread id
std::thread::id my_id = std::this_thread::get_id();
std::stringstream thread_id;
+ std::stringstream ss;
+
thread_id << my_id;
// Stats counters
unsigned long attempts = 0;
unsigned long fails = 0;
- // Get the rmr context
+ // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe)
void *rmr_context = parent->get_rmr_context();
- if (!rmr_context){
- std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
- std::string error_string = identifier + " Thread : " + thread_id.str() + " Listener cannot run : no context available";
- mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
- throw error_string;
- }
-
+ assert(rmr_context != NULL);
+
// Get buffer specific to this thread
- rmr_mbuf_t *rmr_message;
- if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){
- // throw exception here ..
- std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
- std::string reason = strerror(errno);
- std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason;
- mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
- throw error_string;
- }
-
+ rmr_mbuf_t *rmr_message = NULL;
+ rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE);
+ assert(rmr_message != NULL);
// Create an epoll instance
int rcv_fd, ep_fd;
struct epoll_event eve, trigger;
- if( (rcv_fd = rmr_get_rcvfd(rmr_context)) < 0){
- std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
- std::string reason = strerror(errno);
- std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a receive file descriptor : " + reason;
- mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
- throw error_string;
- }
-
- if( (ep_fd = epoll_create1(0) ) < 0){
- std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
- std::string reason = strerror(errno);
- std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting an epoll file descriptor :" + reason;
- mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
- throw error_string;
- }
-
+ rcv_fd = rmr_get_rcvfd(rmr_context);
+ assert(rcv_fd > 0);
+
+ ep_fd = epoll_create1(0);
+ assert(ep_fd > 0);
+
trigger.events = EPOLLIN|EPOLLET|EPOLLONESHOT;
trigger.data.fd = rcv_fd;
if (epoll_ctl (ep_fd, EPOLL_CTL_ADD, rcv_fd, &trigger) < 0){
- std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
- std::string reason = strerror(errno);
- std::string error_string = identifier + " Thread: " + thread_id.str() + " Error registering epoll file descriptor : " + reason;
- mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
- throw error_string;
+ ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error registering epoll file descriptor" << " Reason = " << strerror(errno) << std::endl;
+ mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
+ throw std::runtime_error(ss.str());
}
- int num_retries = this->get_num_retries();
- int i = 0;
int num_fds = 0;
bool send_ok;
mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str());
-
+
+ // the workhorse loop
while(parent->_isRunning()){
num_fds = epoll_wait(ep_fd, &eve, 1, EPOLL_TIMEOUT);
// Re-arm the trigger
if (epoll_ctl (ep_fd, EPOLL_CTL_MOD, rcv_fd, &trigger) < 0){
- std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
- std::string reason = strerror(errno);
- std::string error_string = identifier + " Thread: " + thread_id.str() + " Error re-arming epoll : " + reason;
- mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
- throw error_string;
+ ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error re-arming epoll" << " Reason = " << strerror(errno) << std::endl;
+ mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
+ throw std::runtime_error(ss.str());
}
}
recvs++;
bool res = msg_fn(rmr_message);
- // is there anything to send
+ // is there anything to send ?
if (res && rmr_message != NULL && likely (rmr_message->len > 0 && rmr_message->len <= RMR_BUFFER_SIZE)){
- i = 0;
- rmr_message->sub_id = RMR_VOID_SUBID;
+
+ rmr_message->sub_id = RMR_VOID_SUBID; // do we change this ?
send_ok = false;
- while(i < num_retries){
-
- // Need to handle differently depending on whether message
- // is for A1 (determined by type) or non-A1.
- // For now, A1 requires we bypass the routing table and send
- // directly back to originator using rmr_rts_msg rather than
- // over the bus
-
- if (unlikely(rmr_message->mtype == DC_ADM_INT_CONTROL_ACK || rmr_message->mtype == DC_ADM_GET_POLICY_ACK)){
- rmr_message = rmr_rts_msg(rmr_context, rmr_message);
- }
- else{
- rmr_message->state = 0; // fix for nng
- rmr_message = rmr_send_msg(rmr_context, rmr_message);
- }
- attempts ++;
-
- if (! rmr_message){
- // CRITICAL error. break out of loop
- std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
- std::string error_string = identifier + " rmr_send returned NULL";
- mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
- break;
- }
- else if (rmr_message->state == RMR_OK){
- send_ok = true;
- break;
- }
-
- else if(rmr_message->state == RMR_ERR_RETRY){
- i++;
- fails++;
- }
- else{
- mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_message->state, strerror(errno));
- break;
- }
-
+ if (unlikely(rmr_message->mtype == A1_POLICY_RESP)){
+ // for a1 messages we use send in high reliability mode and RTS
+ send_ok = Send(rmr_message, HIGH_RELIABILITY, RTS);
+ }
+ else{
+ rmr_message->state = 0; // fix for nng
+ send_ok = Send(rmr_message);
}
-
+ attempts ++;
+
if (send_ok == false){
error_handler(rmr_message);
+ fails ++;
}
}
mdclog_write(MDCLOG_INFO, "Finished thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);
}
-
-template <typename messageProcessor>
-void XaPP::Start(messageProcessor && msg_fn){
-
- std::lock_guard<std::mutex> guard(*_transmit);
- _listen = true;
-
- // Spin up the the workThreads .....
- for(unsigned int i = 0; i < _num_threads; i++){
- thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
- }
-
-};
-
-// template if calling function provides an error handler also
-template <typename messageProcessor, typename errorHandler>
-void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){
-
- std::lock_guard<std::mutex> guard(*_transmit);
- _listen = true;
-
- // Spin up the the workThreads .....
- for(unsigned int i = 0; i < _num_threads; i++){
- //std::cout << "Starting thread number " << i << std::endl;
- thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));
-
- }
-
-
-};
-
// Template to allow a user defined processor to start
// on a specific thread
template <typename messageProcessor>