2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <sys/epoll.h>
32 #include <rmr/RIC_message_types.h>
33 #include <mdclog/mdclog.h>
41 #define XAPP_NAME_LENGTH 128
42 #define PROTO_PORT_LENGTH 16
43 #define MAX_RETRIES 16
44 #define MAX_WAIT_TIME 10
45 #define EPOLL_TIMEOUT 500 //in milli-seconds
46 #define RMR_TIMEOUT 50// in mill-seconds
47 #define RMR_BUFFER_SIZE 16384
51 #define likely(x) __builtin_expect((x), 1)
52 #define unlikely(x) __builtin_expect((x), 0)
55 #define unlikely(x) (x)
58 void init_logger(const char *AppName, mdclog_severity_t log_level);
65 XaPP(char *, char *, int);
66 XaPP(char *, char *, int, int);
68 XaPP(XaPP &&) = default; // destructor freaks out with non-copyable thread otherwise ..
69 std::string getName(void);
72 // ideally can reduce tempate definitions to just two
73 // but for now leaving it open ...
75 // template definition to allow a user defined
76 // processor to be started in multiple threads
77 template <typename messageProcessor>
78 void Start(messageProcessor &&);
80 // template definition to allow a user defined
81 // processor and error handler if a send fails
82 // to be started in multiple threads
83 template <typename messageProcessor, typename errorHandler>
84 void Start(messageProcessor &&, errorHandler &&);
86 // Template to allow a user defined processor to start
87 // on a single thread each time it is invoked
88 template <typename messageProcessor >
89 unsigned int StartThread(messageProcessor &&);
91 // Template to allow a user defined processor and
92 // error handle to start // on a single thread each time it
94 template <typename messageProcessor , typename errorHandler>
95 unsigned int StartThread(messageProcessor &&, errorHandler &&);
98 bool Send(int type, int payload_len, void *payload);
99 bool Send(int type, int payload_len, void *payload, unsigned char const *meid);
100 bool Send(rmr_mbuf_t * rmr_tx_message);
101 void * get_rmr_context(void);
102 void set_num_retries(int );
103 int get_num_retries(void );
104 unsigned long get_Send_attempts(void);
105 unsigned long get_Send_fails(void);
111 void _error_handler(rmr_mbuf_t *); // pass through placeholder
113 template<typename messageProcessor, typename errorHandler>
114 void _workThread(messageProcessor &&, errorHandler &&, XaPP *);
116 char _xapp_name[XAPP_NAME_LENGTH];
117 char _proto_port[PROTO_PORT_LENGTH];
123 unsigned int _num_threads;
124 unsigned long _num_attempts;
125 unsigned long _num_fails;
128 std::mutex *_transmit;
129 std::map <unsigned int, std::thread> thread_group;
130 rmr_mbuf_t * _rmr_tx_message;
132 bool _isRunning(void);
137 template <typename messageProcessor, typename errorHandler>
138 void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){
142 std::thread::id my_id = std::this_thread::get_id();
143 std::stringstream thread_id;
147 unsigned long recvs = 0;
148 unsigned long attempts = 0;
149 unsigned long fails = 0;
151 // Get the rmr context
152 void *rmr_context = parent->get_rmr_context();
154 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
155 std::string error_string = identifier + " Thread : " + thread_id.str() + " Listener cannot run : no context available";
156 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
160 // Get buffer specific to this thread
161 rmr_mbuf_t *rmr_message;
162 if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){
163 // throw exception here ..
164 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
165 std::string reason = strerror(errno);
166 std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason;
167 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
172 // Create an epoll instance
174 struct epoll_event eve, trigger;
175 if( (rcv_fd = rmr_get_rcvfd(rmr_context)) < 0){
176 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
177 std::string reason = strerror(errno);
178 std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a receive file descriptor : " + reason;
179 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
183 if( (ep_fd = epoll_create1(0) ) < 0){
184 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
185 std::string reason = strerror(errno);
186 std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting an epoll file descriptor :" + reason;
187 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
191 trigger.events = EPOLLIN|EPOLLET|EPOLLONESHOT;
192 trigger.data.fd = rcv_fd;
194 if (epoll_ctl (ep_fd, EPOLL_CTL_ADD, rcv_fd, &trigger) < 0){
195 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
196 std::string reason = strerror(errno);
197 std::string error_string = identifier + " Thread: " + thread_id.str() + " Error registering epoll file descriptor : " + reason;
198 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
203 int num_retries = this->get_num_retries();
208 mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str());
210 while(parent->_isRunning()){
211 num_fds = epoll_wait(ep_fd, &eve, 1, EPOLL_TIMEOUT);
213 if(num_fds && eve.data.fd == rcv_fd){
214 rmr_message = rmr_torcv_msg(rmr_context, rmr_message, RMR_TIMEOUT);
215 //rmr_message = rmr_rcv_msg(rmr_context, rmr_message);
217 // Re-arm the trigger
218 if (epoll_ctl (ep_fd, EPOLL_CTL_MOD, rcv_fd, &trigger) < 0){
219 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
220 std::string reason = strerror(errno);
221 std::string error_string = identifier + " Thread: " + thread_id.str() + " Error re-arming epoll : " + reason;
222 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
231 if (rmr_message && rmr_message->state == RMR_OK){
234 bool res = msg_fn(rmr_message);
236 // is there anything to send
237 if (res && rmr_message != NULL && likely (rmr_message->len > 0 && rmr_message->len <= RMR_BUFFER_SIZE)){
239 rmr_message->sub_id = RMR_VOID_SUBID;
242 while(i < num_retries){
244 // Need to handle differently depending on whether message
245 // is for A1 (determined by type) or non-A1.
246 // For now, A1 requires we bypass the routing table and send
247 // directly back to originator using rmr_rts_msg rather than
250 if (unlikely(rmr_message->mtype == DC_ADM_INT_CONTROL_ACK || rmr_message->mtype == DC_ADM_GET_POLICY_ACK)){
251 rmr_message = rmr_rts_msg(rmr_context, rmr_message);
254 rmr_message->state = 0; // fix for nng
255 rmr_message = rmr_send_msg(rmr_context, rmr_message);
260 // CRITICAL error. break out of loop
261 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
262 std::string error_string = identifier + " rmr_send returned NULL";
263 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
266 else if (rmr_message->state == RMR_OK){
271 else if(rmr_message->state == RMR_ERR_RETRY){
276 mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_message->state, strerror(errno));
282 if (send_ok == false){
283 error_handler(rmr_message);
294 rmr_free_msg(rmr_message);
296 catch(std::runtime_error &e){
297 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
298 std::string error_string = identifier = " Error freeing RMR message ";
299 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
302 mdclog_write(MDCLOG_INFO, "Finished thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);
306 template <typename messageProcessor>
307 void XaPP::Start(messageProcessor && msg_fn){
309 std::lock_guard<std::mutex> guard(*_transmit);
312 // Spin up the the workThreads .....
313 for(unsigned int i = 0; i < _num_threads; i++){
314 thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
319 // template if calling function provides an error handler also
320 template <typename messageProcessor, typename errorHandler>
321 void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){
323 std::lock_guard<std::mutex> guard(*_transmit);
326 // Spin up the the workThreads .....
327 for(unsigned int i = 0; i < _num_threads; i++){
328 //std::cout << "Starting thread number " << i << std::endl;
329 thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));
336 // Template to allow a user defined processor to start
337 // on a specific thread
338 template <typename messageProcessor>
339 unsigned int XaPP::StartThread(messageProcessor && msg_fn){
341 std::lock_guard<std::mutex> guard(*_transmit);
345 thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
351 // Template to allow a user defined processor and error handler to start
352 // on a specific thread
353 template <typename messageProcessor, typename errorHandler>
354 unsigned int XaPP::StartThread(messageProcessor && msg_fn, errorHandler && error_handler){
356 std::lock_guard<std::mutex> guard(*_transmit);
360 thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));