2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
29 #include <sys/epoll.h>
33 #include <rmr/RIC_message_types.h>
34 #include <mdclog/mdclog.h>
41 #define XAPP_NAME_LENGTH 128
42 #define PROTO_PORT_LENGTH 16
43 #define MAX_RETRIES 16
44 #define MAX_WAIT_TIME 10
45 #define EPOLL_TIMEOUT 500 //in milli-seconds
46 #define RMR_TIMEOUT 50// in mill-seconds
47 #define RMR_BUFFER_SIZE 16384
51 #define likely(x) __builtin_expect((x), 1)
52 #define unlikely(x) __builtin_expect((x), 0)
55 #define unlikely(x) (x)
59 // define RMR Send behaviour for different link-types
60 // controls how often we try and delay between tries, as well as method
62 enum link_types {LOW_LATENCY, HIGH_RELIABILITY};
63 static const unsigned int link_delays[] = {1, 10}; // milli-seconds to wait before retries
64 static const unsigned int link_retries[] = {4, 15}; // number of times to retry
65 enum tx_types {ROUTE, RTS}; // regular rmr or rts
67 void init_logger(const char *AppName, mdclog_severity_t log_level);
74 XaPP(char *, char *, int);
76 XaPP(XaPP &&) = default; // destructor freaks out with non-copyable thread otherwise ..
78 std::string get_name(void);
82 size_t get_num_active_threads(void) const { return thread_group.size(); };
84 // ideally can reduce tempate definitions to just two
85 // but for now leaving it open ...
87 // Template to allow a user defined processor to start
89 template <typename messageProcessor >
90 unsigned int StartThread(messageProcessor &&);
92 // Template to allow a user defined processor AND
93 // error handle to start // on a single thread each time it
95 template <typename messageProcessor , typename errorHandler>
96 unsigned int StartThread(messageProcessor &&, errorHandler &&);
100 // various flavours of send : first two finally call the last
101 bool Send(int type, size_t payload_len, void *payload, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
102 bool Send(int type, size_t payload_len, void *payload, unsigned char const *meid, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
103 bool Send(rmr_mbuf_t * rmr_tx_message, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
105 void * get_rmr_context(void);
111 void _error_handler(rmr_mbuf_t *); // pass through placeholder
113 template<typename messageProcessor, typename errorHandler>
114 void _workThread(messageProcessor &&, errorHandler &&, XaPP *);
116 char _xapp_name[XAPP_NAME_LENGTH];
117 char _proto_port[PROTO_PORT_LENGTH];
124 unsigned int _num_threads;
127 std::mutex *_transmit;
128 std::map <unsigned int, std::thread> thread_group;
129 rmr_mbuf_t * _rmr_tx_message;
131 bool _isRunning(void);
135 // main workhorse thread which does the listen->process->respond loop
136 template <typename messageProcessor, typename errorHandler>
137 void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){
141 std::thread::id my_id = std::this_thread::get_id();
142 std::stringstream thread_id;
143 std::stringstream ss;
148 unsigned long recvs = 0;
149 unsigned long attempts = 0;
150 unsigned long fails = 0;
152 // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe)
153 void *rmr_context = parent->get_rmr_context();
154 assert(rmr_context != NULL);
156 // Get buffer specific to this thread
157 rmr_mbuf_t *rmr_message = NULL;
158 rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE);
159 assert(rmr_message != NULL);
161 // Create an epoll instance
163 struct epoll_event eve, trigger;
164 rcv_fd = rmr_get_rcvfd(rmr_context);
167 ep_fd = epoll_create1(0);
170 trigger.events = EPOLLIN|EPOLLET|EPOLLONESHOT;
171 trigger.data.fd = rcv_fd;
173 if (epoll_ctl (ep_fd, EPOLL_CTL_ADD, rcv_fd, &trigger) < 0){
174 ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error registering epoll file descriptor" << " Reason = " << strerror(errno) << std::endl;
175 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
176 throw std::runtime_error(ss.str());
183 mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str());
185 // the workhorse loop
186 while(parent->_isRunning()){
187 num_fds = epoll_wait(ep_fd, &eve, 1, EPOLL_TIMEOUT);
189 if(num_fds && eve.data.fd == rcv_fd){
190 rmr_message = rmr_torcv_msg(rmr_context, rmr_message, RMR_TIMEOUT);
191 //rmr_message = rmr_rcv_msg(rmr_context, rmr_message);
193 // Re-arm the trigger
194 if (epoll_ctl (ep_fd, EPOLL_CTL_MOD, rcv_fd, &trigger) < 0){
195 ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error re-arming epoll" << " Reason = " << strerror(errno) << std::endl;
196 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
197 throw std::runtime_error(ss.str());
205 if (rmr_message && rmr_message->state == RMR_OK){
208 bool res = msg_fn(rmr_message);
210 // is there anything to send ?
211 if (res && rmr_message != NULL && likely (rmr_message->len > 0 && rmr_message->len <= RMR_BUFFER_SIZE)){
213 rmr_message->sub_id = RMR_VOID_SUBID; // do we change this ?
216 if (unlikely(rmr_message->mtype == A1_POLICY_RESP)){
217 // for a1 messages we use send in high reliability mode and RTS
218 send_ok = Send(rmr_message, HIGH_RELIABILITY, RTS);
221 rmr_message->state = 0; // fix for nng
222 send_ok = Send(rmr_message);
226 if (send_ok == false){
227 error_handler(rmr_message);
239 rmr_free_msg(rmr_message);
241 catch(std::runtime_error &e){
242 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
243 std::string error_string = identifier = " Error freeing RMR message ";
244 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
247 mdclog_write(MDCLOG_INFO, "Finished thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);
250 // Template to allow a user defined processor to start
251 // on a specific thread
252 template <typename messageProcessor>
253 unsigned int XaPP::StartThread(messageProcessor && msg_fn){
255 std::lock_guard<std::mutex> guard(*_transmit);
259 thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
265 // Template to allow a user defined processor and error handler to start
266 // on a specific thread
267 template <typename messageProcessor, typename errorHandler>
268 unsigned int XaPP::StartThread(messageProcessor && msg_fn, errorHandler && error_handler){
270 std::lock_guard<std::mutex> guard(*_transmit);
274 thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));