2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
20 /* Author : Ashwin Sridharan
25 #include "xapp_utils.hpp"
28 XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false), _msg_size(msg_size) {
30 strcpy(_xapp_name, xapp_name);
31 strcpy(_proto_port, proto_port);
37 // Destructor closes rmr context if available
40 // Call stop to clear thread table
44 rmr_free_msg(_rmr_tx_message);
54 // Get the RMR context
55 void XaPP::init(int msg_size){
57 if (msg_size > RMR_BUFFER_SIZE or msg_size <= 0){
59 ss << "Error ::" << __FILE__ << "," << __LINE__ << " Invalid buffer size " << msg_size << " for RMR initialization context. Must be between " << 1 << " and " << RMR_BUFFER_SIZE << std::endl;
60 ss << " To change max buffer requested, update RMR_BUFFER_SIZE " << std::endl;
61 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
65 // Initialze the rmr context
66 _rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE);
67 assert(_rmr_ctx != NULL);
72 // Blocks till routing table is received
73 void XaPP::get_routes(void){
75 // Wait for getting a routing table ...
78 while( i < MAX_WAIT_TIME){
79 std::cout <<"Waiting for RMR to be ready " << std::endl;
80 if ((_is_ready = rmr_ready(_rmr_ctx)) == 1){
88 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
89 std::string error_string = identifier + " Error getting routing table";
90 throw std::runtime_error(error_string);
94 // Get a tx buffer in case we need to do a transmit from the main thread itself
95 _rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE);
96 assert(_rmr_tx_message != NULL);
97 std::cout <<"Route Table received. Send buffer allocated" << std::endl;
98 _transmit = new std::mutex();
102 // Send method that takes TLV (type/length/value) input
103 bool XaPP::Send(int type, size_t payload_len, void *payload, link_types mode, tx_types send_type){
105 if (likely(payload_len <= RMR_BUFFER_SIZE)){
106 _rmr_tx_message->mtype = type;
107 memcpy(_rmr_tx_message->payload, payload, payload_len);
108 _rmr_tx_message->len = payload_len;
109 return Send(_rmr_tx_message, mode, send_type);
112 std::stringstream ss;
113 ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl;
114 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
120 // Send method that takes TLV (type/length/value) input + MEID
121 bool XaPP::Send(int type, size_t payload_len, void *payload, unsigned char const * meid, link_types mode, tx_types send_type){
122 if (likely(payload_len <= RMR_BUFFER_SIZE)){
123 _rmr_tx_message->mtype = type;
124 memcpy(_rmr_tx_message->payload, payload, payload_len);
125 _rmr_tx_message->len = payload_len;
126 rmr_str2meid(_rmr_tx_message, meid);
127 return Send(_rmr_tx_message, mode, send_type);
130 std::stringstream ss;
131 ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl;
132 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
139 // Send method that takes a buffer
140 bool XaPP::Send(rmr_mbuf_t * rmr_tx_message, link_types mode, tx_types send_type){
142 if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE && rmr_tx_message->len > 0)){
144 rmr_tx_message->sub_id = RMR_VOID_SUBID;
146 while(i <= link_retries[mode]){
148 //rmr_tx_message->state = 0; // fix for nng
150 if(likely(send_type == ROUTE)){
151 rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message);
154 rmr_tx_message = rmr_rts_msg(_rmr_ctx, rmr_tx_message);
157 if (! rmr_tx_message){
158 // CRITICAL EROR .. log it and return
159 std::stringstream ss;
160 ss << __FILE__ << "," << __LINE__ << " RMR send function returned NULL. Reason = " << strerror(errno) << std::endl;
161 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
164 else if (rmr_tx_message->state == RMR_OK){
167 else if(rmr_tx_message->state == RMR_ERR_RETRY){
169 std::this_thread::sleep_for(std::chrono::milliseconds(link_delays[mode]));
172 mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno));
179 mdclog_write(MDCLOG_ERR, "Error: %s, %d Invalid state/message for RMR tx. Ready = %d, Message len = %d\n", __FILE__, __LINE__, _is_ready, rmr_tx_message->len);
183 std::stringstream ss;
184 ss << "Error ::" << __FILE__ << "," << __LINE__ << " Could not send message of type " << rmr_tx_message->mtype << " and size " << rmr_tx_message->len << ". Reason = " << strerror(errno);
185 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
193 void XaPP::Stop(void){
194 // Get the mutex lock
195 std::lock_guard<std::mutex> guard(*_transmit);
198 //Wait for all threads to stop
199 for(auto & t: thread_group){
200 std::thread::id my_id = t.second.get_id();
201 std::stringstream thread_id;
207 // Clear thread table ...
208 thread_group.clear();
212 // default error handler if non specified by user
213 // pass through for now
214 void XaPP::_error_handler(rmr_mbuf_t *message){
220 //----------------------------------------
221 // Some get/set methods
222 //---------------------------------------
224 std::string XaPP::get_name(void){
225 return std::string(_xapp_name);
228 int XaPP::get_status(void){
232 bool XaPP::_isRunning(void){
237 void * XaPP::get_rmr_context(void){
242 void init_logger(const char *AppName, mdclog_severity_t log_level)
245 mdclog_attr_init(&attr);
246 mdclog_attr_set_ident(attr, AppName);
248 mdclog_level_set(log_level);
249 mdclog_attr_destroy(attr);