2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
20 /* Author : Ashwin Sridharan
25 #include "xapp_utils.hpp"
27 // Constructor that automatically determines number of threads
28 XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_attempts(0), _num_fails(0){
30 _num_threads = std::thread::hardware_concurrency();
31 strcpy(_xapp_name, xapp_name);
32 strcpy(_proto_port, proto_port);
40 // Constructor that takes number of threads as argument
41 XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size, int num_threads): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_threads(num_threads),_num_attempts(0), _num_fails(0) {
43 strcpy(_xapp_name, xapp_name);
44 strcpy(_proto_port, proto_port);
50 // Destructor closes rmr context if available
53 // Call stop to clear thread table
57 rmr_free_msg(_rmr_tx_message);
66 // Get the RMR context
67 void XaPP::init(int msg_size){
69 if (msg_size > RMR_BUFFER_SIZE or msg_size <= 0){
71 ss << "Error ::" << __FILE__ << "," << __LINE__ << " Invalid buffer size " << msg_size << " for RMR initialization context. Must be between " << 1 << " and " << RMR_BUFFER_SIZE << std::endl;
72 ss << " To change max buffer requested, update RMR_BUFFER_SIZE " << std::endl;
73 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
77 // Initialze the rmr context
78 if ( (_rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE)) == NULL){
79 // throw exception here ..
81 ss << "Error ::" << __FILE__ << "," << __LINE__ << " Error initiatilizing RMR context for " << _xapp_name << " on port " << _proto_port << " Reason = " << strerror(errno) << std::endl;
82 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
89 // Blocks till routing table is received
90 void XaPP::get_routes(void){
92 // Wait for getting a routing table ...
95 while( i < MAX_WAIT_TIME){
96 std::cout <<"Waiting for RMR to be ready " << std::endl;
97 if ((_is_ready = rmr_ready(_rmr_ctx)) == 1){
105 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
106 std::string error_string = identifier + " Error getting routing table";
107 throw std::runtime_error(error_string);
111 // Get a tx buffer in case we need to do a transmit from the main thread itself
112 if ( (_rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE)) == NULL){
113 // throw exception here ..
114 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
115 std::string error_string = identifier + " Error getting a send buffer";
116 throw std::runtime_error(error_string);
119 std::cout <<"Route Table received. Send buffer allocated" << std::endl;
120 _transmit = new std::mutex();
124 // Send method that takes TLV (type/length/value) input
125 bool XaPP::Send(int type, int payload_len, void *payload){
127 if (likely(_is_ready)){
128 if (likely(payload_len <= RMR_BUFFER_SIZE)){
129 _rmr_tx_message->mtype = type;
130 memcpy(_rmr_tx_message->payload, payload, payload_len);
131 _rmr_tx_message->len = payload_len;
132 return Send(_rmr_tx_message);
135 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
136 std::string error_string = identifier + " message payload len " + std::to_string(payload_len) + " exceeds maximum buffer size " + std::to_string(RMR_BUFFER_SIZE);
137 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
141 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
142 std::string error_string = identifier + " rmr not ready to send";
143 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
149 // Send method that takes TLV (type/length/value) input + MEID
150 bool XaPP::Send(int type, int payload_len, void *payload, unsigned char const * meid){
155 _rmr_tx_message->mtype = type;
156 memcpy(_rmr_tx_message->payload, payload, payload_len);
157 _rmr_tx_message->len = payload_len;
158 rmr_str2meid(_rmr_tx_message, meid);
159 return Send(_rmr_tx_message);
164 // Send method that takes a buffer
165 bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){
167 if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE && rmr_tx_message->len > 0)){
169 rmr_tx_message->sub_id = RMR_VOID_SUBID;
170 while(i <= _num_retries){
172 //rmr_tx_message->state = 0; // fix for nng
173 rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message);
176 if (! rmr_tx_message){
177 // CRITICAL EROR .. log it
178 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
179 std::string error_string = identifier + " rmr_send returned NULL";
180 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
183 else if (rmr_tx_message->state == RMR_OK){
186 else if(rmr_tx_message->state == RMR_ERR_RETRY){
191 mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno));
198 mdclog_write(MDCLOG_ERR, "Error: %s, %d Invalid state/message for RMR tx. Ready = %d, Message len = %d\n", __FILE__, __LINE__, _is_ready, rmr_tx_message->len);
202 std::stringstream ss;
203 ss << "Error ::" << __FILE__ << "," << __LINE__ << " Could not send message of type " << rmr_tx_message->mtype << " and size " << rmr_tx_message->len << ". Reason = " << strerror(errno);
204 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
212 void XaPP::Stop(void){
213 // Get the mutex lock
214 std::lock_guard<std::mutex> guard(*_transmit);
217 //Wait for all threads to stop
218 for(auto & t: thread_group){
219 std::thread::id my_id = t.second.get_id();
220 std::stringstream thread_id;
226 // Clear thread table ...
227 thread_group.clear();
231 // default error handler if non specified by user
232 // pass through for now
233 void XaPP::_error_handler(rmr_mbuf_t *message){
239 //----------------------------------------
240 // Some get/set methods
241 //---------------------------------------
243 std::string XaPP::getName(void){
244 return std::string(_xapp_name);
247 int XaPP::getStatus(void){
251 bool XaPP::_isRunning(void){
256 void * XaPP::get_rmr_context(void){
260 void XaPP::set_num_retries(int num_retries){
261 if (num_retries < 0 || num_retries > MAX_RETRIES){
262 throw "[xapp_utils] : Illegal value of num_retries. Must be positive integer between 0 and MAX_RETRIES\n";
265 _num_retries = num_retries;
268 int XaPP::get_num_retries(void){
273 unsigned long XaPP::get_Send_attempts(void){
274 return _num_attempts;
278 unsigned long XaPP::get_Send_fails(void){
285 void init_logger(const char *AppName, mdclog_severity_t log_level)
288 mdclog_attr_init(&attr);
289 mdclog_attr_set_ident(attr, AppName);
291 mdclog_level_set(log_level);
292 mdclog_attr_destroy(attr);