User story RICPLT-2620
[ric-app/admin.git] / src / xapp_utils.cc
index 8130a1d..68394bc 100644 (file)
 
 #include "xapp_utils.hpp"
 
-// Constructor that automatically determines number of threads
-XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_attempts(0), _num_fails(0){
-
-  _num_threads = std::thread::hardware_concurrency();
-  strcpy(_xapp_name, xapp_name);
-  strcpy(_proto_port, proto_port);
-  init(_msg_size);
-  get_routes();
-
-};
-  
-
-
-// Constructor that takes number of threads as argument 
-XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size, int num_threads): _is_ready(0), _listen(false), _num_retries(2),  _msg_size(msg_size), _num_threads(num_threads),_num_attempts(0), _num_fails(0) {
+// Constructor
+XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false),  _msg_size(msg_size) {
 
   strcpy(_xapp_name, xapp_name);
   strcpy(_proto_port, proto_port);
@@ -59,6 +46,7 @@ XaPP::~XaPP(void){
   if (_rmr_ctx){
     rmr_close(_rmr_ctx);
   }
+  
   // delete mutex
   delete _transmit;
 };
@@ -75,13 +63,8 @@ void XaPP::init(int msg_size){
   }
   
   // Initialze the rmr context
-  if ( (_rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE)) == NULL){
-    // throw exception here ..
-    std::stringstream ss;
-    ss << "Error ::" << __FILE__ << "," << __LINE__ << " Error initiatilizing RMR context for " << _xapp_name << " on port " << _proto_port << " Reason = " << strerror(errno) << std::endl;
-    mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
-    throw ss.str();
-  }
+  _rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE);
+  assert(_rmr_ctx != NULL);
   
 }
 
@@ -109,75 +92,73 @@ void XaPP::get_routes(void){
   
 
   // Get a tx buffer in case we need to do a transmit from the main thread itself
-  if ( (_rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE)) == NULL){
-    // throw exception here ..
-    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
-    std::string error_string = identifier + " Error getting a send buffer";
-    throw std::runtime_error(error_string);
-  }
-
+  _rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE);
+  assert(_rmr_tx_message != NULL);
   std::cout <<"Route Table received. Send buffer allocated" << std::endl;
   _transmit = new std::mutex();
 
 }
 
 // Send method that takes TLV (type/length/value) input
-bool XaPP::Send(int type,  int payload_len, void *payload){
-
-  if (likely(_is_ready)){
-    if (likely(payload_len <= RMR_BUFFER_SIZE)){
-      _rmr_tx_message->mtype  = type;
-      memcpy(_rmr_tx_message->payload, payload, payload_len);
-      _rmr_tx_message->len = payload_len;
-      return Send(_rmr_tx_message);
-    }
-    else{
-       std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
-       std::string error_string = identifier + " message payload len " + std::to_string(payload_len) + " exceeds maximum buffer size " + std::to_string(RMR_BUFFER_SIZE);
-       mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
-    }
+bool XaPP::Send(int type,  size_t payload_len, void *payload, link_types mode, tx_types send_type){
+
+  if (likely(payload_len <= RMR_BUFFER_SIZE)){
+    _rmr_tx_message->mtype  = type;
+    memcpy(_rmr_tx_message->payload, payload, payload_len);
+    _rmr_tx_message->len = payload_len;
+    return Send(_rmr_tx_message, mode, send_type);
   }
   else{
-    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
-    std::string error_string = identifier + " rmr not ready to send";
-    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
+    std::stringstream ss;
+    ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl;
+    mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
   }
-  
+
   return false;
 }
 
 // Send method that takes TLV (type/length/value) input + MEID
-bool XaPP::Send(int type,  int payload_len, void *payload, unsigned char const * meid){
-  if (!_is_ready){
-    return false;
+bool XaPP::Send(int type,  size_t payload_len, void *payload, unsigned char const * meid, link_types mode, tx_types send_type){
+  if (likely(payload_len <= RMR_BUFFER_SIZE)){
+    _rmr_tx_message->mtype  = type;  
+    memcpy(_rmr_tx_message->payload, payload, payload_len);
+    _rmr_tx_message->len = payload_len;
+    rmr_str2meid(_rmr_tx_message, meid);
+    return Send(_rmr_tx_message, mode, send_type);
+  }
+  else{
+    std::stringstream ss;
+    ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl;
+    mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
   }
 
-  _rmr_tx_message->mtype  = type;  
-  memcpy(_rmr_tx_message->payload, payload, payload_len);
-  _rmr_tx_message->len = payload_len;
-  rmr_str2meid(_rmr_tx_message, meid);
-  return Send(_rmr_tx_message);
-  
+  return false;
 }
 
 
 // Send method that takes a buffer
-bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){
+bool XaPP::Send(rmr_mbuf_t * rmr_tx_message, link_types mode, tx_types send_type){
 
   if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE  && rmr_tx_message->len > 0)){
-    int i = 0;
+    unsigned int i = 0;
     rmr_tx_message->sub_id = RMR_VOID_SUBID;
-    while(i <= _num_retries){
+    
+    while(i <= link_retries[mode]){
       
       //rmr_tx_message->state = 0; // fix for nng
-      rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message);
-      _num_attempts ++;
+      // how to send
+      if(likely(send_type == ROUTE)){
+       rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message);
+      }
+      else{
+       rmr_tx_message = rmr_rts_msg(_rmr_ctx, rmr_tx_message);
+      }
       
       if (! rmr_tx_message){
-        // CRITICAL EROR .. log it 
-        std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
-        std::string error_string = identifier + " rmr_send returned NULL";
-        mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
+        // CRITICAL EROR .. log it and return
+       std::stringstream ss;
+       ss << __FILE__ << "," << __LINE__ << " RMR send function returned NULL. Reason = " << strerror(errno) << std::endl;
+        mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
        return false;
       }
       else if (rmr_tx_message->state == RMR_OK){
@@ -185,7 +166,7 @@ bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){
       }
       else  if(rmr_tx_message->state == RMR_ERR_RETRY){
        i++;
-       _num_fails++;
+       std::this_thread::sleep_for(std::chrono::milliseconds(link_delays[mode]));
       }
       else {
        mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno));
@@ -240,11 +221,11 @@ void XaPP::_error_handler(rmr_mbuf_t *message){
 // Some get/set methods
 //---------------------------------------
 
-std::string XaPP::getName(void){
+std::string XaPP::get_name(void){
   return std::string(_xapp_name);
 }
 
-int XaPP::getStatus(void){
+int XaPP::get_status(void){
   return _is_ready;
 }
 
@@ -257,30 +238,6 @@ void * XaPP::get_rmr_context(void){
   return _rmr_ctx;
 }
 
-void XaPP::set_num_retries(int num_retries){
-  if (num_retries < 0 || num_retries > MAX_RETRIES){
-    throw "[xapp_utils] : Illegal value of num_retries. Must be positive integer between 0 and MAX_RETRIES\n";
-  }
-  
-  _num_retries = num_retries;
-}
-
-int XaPP::get_num_retries(void){
-  return _num_retries;
-}
-
-
-unsigned long  XaPP::get_Send_attempts(void){
-  return _num_attempts;
-};
-
-
-unsigned long XaPP::get_Send_fails(void){
-  return _num_fails;
-};
-
-
-
 
 void init_logger(const char  *AppName, mdclog_severity_t log_level)
 {