User story RICPLT-2620
[ric-app/admin.git] / src / xapp_utils.hpp
index fd9843c..4537ebc 100644 (file)
@@ -22,6 +22,7 @@
 #include <unistd.h>
 #include <string.h>
 #include <error.h>
+#include <assert.h>
 #include <thread>
 #include <map>
 #include <mutex>
@@ -36,7 +37,6 @@
 #ifndef XAPP_UTIL
 # define XAPP_UTIL
 
-#define DEBUG 0
 
 #define XAPP_NAME_LENGTH 128
 #define PROTO_PORT_LENGTH 16
 #define unlikely(x) (x)
 #endif
 
+
+// define RMR Send behaviour for different link-types
+// controls how often we try and delay between tries, as well as method
+
+enum  link_types {LOW_LATENCY, HIGH_RELIABILITY};
+static const unsigned int link_delays[] = {1, 10}; // milli-seconds to wait before retries
+static const unsigned int link_retries[] = {4, 15}; // number of times to retry
+enum  tx_types {ROUTE, RTS}; // regular rmr or rts
+
 void init_logger(const char  *AppName, mdclog_severity_t log_level);
 
 
@@ -63,46 +72,37 @@ class XaPP {
  public:
 
   XaPP(char *, char *, int);
-  XaPP(char *, char *, int, int);
   ~XaPP(void);
   XaPP(XaPP &&) = default;  // destructor freaks out with non-copyable thread otherwise ..
-  std::string getName(void);
-  int  getStatus(void);
 
+  std::string get_name(void);
+
+  int  get_status(void);
+
+  size_t  get_num_active_threads(void) const { return thread_group.size(); };
+  
   // ideally can reduce tempate definitions to just two
   // but for now leaving it open ...
   
-  // template definition to allow a user defined
-  // processor to be started in multiple threads
-  template <typename messageProcessor>
-  void Start(messageProcessor &&);
-
-  // template definition to allow a user defined
-  // processor and  error handler if a send fails 
-  // to be started in multiple threads
-  template <typename messageProcessor, typename errorHandler>
-  void Start(messageProcessor &&, errorHandler &&);
-
   // Template to allow a user defined processor to start
-  // on a single thread each time it is invoked
+  // on a thread 
   template <typename messageProcessor > 
   unsigned int  StartThread(messageProcessor &&);
 
-  // Template to allow a user defined processor and 
+  // Template to allow a user defined processor AND
   // error handle to start // on a single thread each time it
   // is invoked
   template <typename messageProcessor , typename errorHandler> 
   unsigned int StartThread(messageProcessor &&, errorHandler &&);
 
   void Stop(void);
-  bool Send(int type,  int payload_len, void *payload);
-  bool Send(int type, int payload_len, void *payload, unsigned char const  *meid);
-  bool Send(rmr_mbuf_t * rmr_tx_message);
+
+  // various flavours of send : first two finally call the last
+  bool Send(int type,  size_t payload_len, void *payload, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
+  bool Send(int type, size_t  payload_len, void *payload, unsigned char const  *meid, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
+  bool Send(rmr_mbuf_t * rmr_tx_message, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
+  
   void * get_rmr_context(void);
-  void set_num_retries(int );
-  int  get_num_retries(void );
-  unsigned long get_Send_attempts(void);
-  unsigned long get_Send_fails(void);
   
 private:
 
@@ -119,10 +119,9 @@ private:
   int _is_ready;
   bool _listen;
   int _num_retries;
+  int _retry_interval;
   int _msg_size;
   unsigned int _num_threads;
-  unsigned long _num_attempts;
-  unsigned long _num_fails;
 
   void* _rmr_ctx;
   std::mutex *_transmit;
@@ -133,7 +132,7 @@ private:
   
 };
 
-
+// main workhorse thread which does the listen->process->respond loop 
 template <typename messageProcessor, typename errorHandler>
 void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){
 
@@ -141,6 +140,8 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler
   // Get the thread id 
   std::thread::id my_id = std::this_thread::get_id();
   std::stringstream thread_id;
+  std::stringstream ss;
+  
   thread_id << my_id;
 
   // Stats counters 
@@ -148,65 +149,40 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler
   unsigned long attempts = 0;
   unsigned long fails = 0;
  
-  // Get the rmr context 
+  // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe)
   void *rmr_context = parent->get_rmr_context(); 
-  if (!rmr_context){
-    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
-    std::string error_string = identifier +  " Thread : " + thread_id.str() + " Listener cannot run : no context available";
-    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
-    throw error_string;
-  }
-
+  assert(rmr_context != NULL);
+  
   // Get buffer specific to this thread
-  rmr_mbuf_t *rmr_message;
-  if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){
-    // throw exception here ..
-    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
-    std::string reason = strerror(errno);
-    std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason;
-    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
-    throw error_string;
-  }
-
+  rmr_mbuf_t *rmr_message = NULL;
+  rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE);
+  assert(rmr_message != NULL);
 
   // Create an epoll instance
   int rcv_fd, ep_fd;
   struct epoll_event eve, trigger;
-  if( (rcv_fd = rmr_get_rcvfd(rmr_context)) < 0){
-    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ;
-    std::string reason = strerror(errno);
-    std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a receive file descriptor : " + reason;
-    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
-    throw error_string;
-  }
-
-  if( (ep_fd = epoll_create1(0) ) < 0){
-    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ;
-    std::string reason = strerror(errno);
-    std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting an epoll  file descriptor :" + reason;
-    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
-    throw error_string;
-  }
-
+  rcv_fd = rmr_get_rcvfd(rmr_context);
+  assert(rcv_fd > 0);
+  
+  ep_fd = epoll_create1(0);
+  assert(ep_fd > 0);
+  
   trigger.events = EPOLLIN|EPOLLET|EPOLLONESHOT;
   trigger.data.fd = rcv_fd; 
 
   if (epoll_ctl (ep_fd, EPOLL_CTL_ADD, rcv_fd, &trigger) < 0){
-    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ;
-    std::string reason = strerror(errno);
-    std::string error_string = identifier + " Thread: " + thread_id.str() + " Error registering  epoll  file descriptor : " + reason;
-    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
-    throw error_string;
+    ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error registering epoll file descriptor" << " Reason = " << strerror(errno) << std::endl;
+    mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
+    throw std::runtime_error(ss.str());
   }
 
   
-  int num_retries = this->get_num_retries();
-  int i = 0;
   int num_fds = 0;
   bool send_ok;
   
   mdclog_write(MDCLOG_INFO, "Starting thread %s",  thread_id.str().c_str());
-  
+
+  // the workhorse loop 
   while(parent->_isRunning()){
     num_fds = epoll_wait(ep_fd, &eve, 1, EPOLL_TIMEOUT);
     
@@ -216,11 +192,9 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler
       
       // Re-arm the trigger
       if (epoll_ctl (ep_fd, EPOLL_CTL_MOD, rcv_fd, &trigger) < 0){
-       std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ;
-       std::string reason = strerror(errno);
-       std::string error_string = identifier + " Thread: " + thread_id.str() + " Error re-arming epoll : " + reason;
-       mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
-       throw error_string;
+       ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error re-arming epoll" << " Reason = " << strerror(errno) << std::endl;
+       mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
+       throw std::runtime_error(ss.str());
       }
     
     }
@@ -233,54 +207,25 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler
       recvs++;
       bool res = msg_fn(rmr_message);
 
-      // is there anything to send 
+      // is there anything to send ?
       if (res && rmr_message != NULL && likely (rmr_message->len > 0 && rmr_message->len <= RMR_BUFFER_SIZE)){
-       i = 0;
-       rmr_message->sub_id = RMR_VOID_SUBID;
+
+       rmr_message->sub_id = RMR_VOID_SUBID; // do we change this ? 
        send_ok = false;
        
-       while(i < num_retries){
-         
-         // Need to handle differently depending on whether message
-         // is for A1 (determined by type) or non-A1.
-         // For now, A1 requires we bypass the routing table and send
-         // directly back to originator using rmr_rts_msg rather than
-         // over the bus
-
-         if (unlikely(rmr_message->mtype == DC_ADM_INT_CONTROL_ACK || rmr_message->mtype == DC_ADM_GET_POLICY_ACK)){
-           rmr_message = rmr_rts_msg(rmr_context, rmr_message);
-         }
-         else{
-           rmr_message->state = 0; // fix for nng
-           rmr_message = rmr_send_msg(rmr_context, rmr_message);   
-         }
-         attempts ++;
-
-         if (! rmr_message){
-           // CRITICAL  error. break out of loop
-           std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
-           std::string error_string = identifier + " rmr_send returned NULL";
-           mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
-           break;
-         }
-         else if (rmr_message->state == RMR_OK){
-           send_ok = true;
-           break;
-         }
-         
-         else  if(rmr_message->state == RMR_ERR_RETRY){
-           i++;
-           fails++;
-         }
-         else{
-           mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_message->state, strerror(errno));
-           break;
-         }
-        
+       if (unlikely(rmr_message->mtype == A1_POLICY_RESP)){
+         // for a1 messages we use send in high reliability mode and RTS
+         send_ok = Send(rmr_message, HIGH_RELIABILITY, RTS);
+       }
+       else{
+         rmr_message->state = 0; // fix for nng
+         send_ok = Send(rmr_message);   
        }
-         
+       attempts ++;
+       
        if (send_ok == false){
          error_handler(rmr_message);
+         fails ++;
        }
        
       }
@@ -302,37 +247,6 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler
   mdclog_write(MDCLOG_INFO, "Finished  thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);
 }
 
-
-template <typename messageProcessor>
-void XaPP::Start(messageProcessor && msg_fn){
-
-  std::lock_guard<std::mutex> guard(*_transmit);
-  _listen = true;
-
-  // Spin up the the workThreads ..... 
-  for(unsigned int i = 0; i < _num_threads; i++){
-    thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
-  }
-  
-};
-
-// template if calling function provides an error handler also
-template <typename messageProcessor, typename errorHandler>
-void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){
-
-  std::lock_guard<std::mutex> guard(*_transmit);
-  _listen = true;
-
-  // Spin up the the workThreads ..... 
-  for(unsigned int i = 0; i < _num_threads; i++){
-    //std::cout << "Starting thread number " << i << std::endl;
-    thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));
-
-  }
-
-  
-};
-
 // Template to allow a user defined processor to start
 // on a specific thread 
 template <typename messageProcessor>