2 ==================================================================================
\r
4 Copyright (c) 2018-2019 SAMSUNG and AT&T Intellectual Property.
\r
6 Licensed under the Apache License, Version 2.0 (the "License");
\r
7 you may not use this file except in compliance with the License.
\r
8 You may obtain a copy of the License at
\r
10 http://www.apache.org/licenses/LICENSE-2.0
\r
12 Unless required by applicable law or agreed to in writing, software
\r
13 distributed under the License is distributed on an "AS IS" BASIS,
\r
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
15 See the License for the specific language governing permissions and
\r
16 limitations under the License.
\r
17 ==================================================================================
\r
28 #include <functional>
\r
30 #include <rmr/rmr.h>
\r
31 #include <rmr/RIC_message_types.h>
\r
32 #include <mdclog/mdclog.h>
\r
33 #include <hiredis/hiredis.h>
\r
41 #define XAPP_NAME_LENGTH 128
\r
42 #define PROTO_PORT_LENGTH 16
\r
43 #define MAX_RETRIES 16
\r
44 #define MAX_WAIT_TIME 10
\r
45 #define RMR_TIMEOUT 50// in mill-seconds
\r
46 #define RMR_BUFFER_SIZE 16384
\r
47 #define MAX_THREADS 8
\r
50 #define likely(x) __builtin_expect((x), 1)
\r
51 #define unlikely(x) __builtin_expect((x), 0)
\r
53 #define likely(x) (x)
\r
54 #define unlikely(x) (x)
\r
57 void init_logger(const char *AppName, mdclog_severity_t log_level);
\r
64 XaPP(char *, char *, int, int);
\r
65 XaPP(char *, char *, int, int, int);
\r
67 XaPP(XaPP &&) = default; // destructor freaks out with non-copyable thread otherwise ..
\r
68 std::string getName(void);
\r
69 int getStatus(void);
\r
71 // ideally can reduce tempate definitions to just two
\r
72 // but for now leaving it open ...
\r
74 // template definition to allow a user defined
\r
75 // processor to be started in multiple threads
\r
76 template <typename messageProcessor>
\r
77 void Start(messageProcessor &&);
\r
79 // template definition to allow a user defined
\r
80 // processor and error handler if a send fails
\r
81 // to be started in multiple threads
\r
82 template <typename messageProcessor, typename errorHandler>
\r
83 void Start(messageProcessor &&, errorHandler &&);
\r
85 // Template to allow a user defined processor to start
\r
86 // on a single thread each time it is invoked
\r
87 template <typename messageProcessor >
\r
88 unsigned int StartThread(messageProcessor &&);
\r
90 // Template to allow a user defined processor and
\r
91 // error handle to start // on a single thread each time it
\r
93 template <typename messageProcessor , typename errorHandler>
\r
94 unsigned int StartThread(messageProcessor &&, errorHandler &&);
\r
97 bool Send(int type, int payload_len, void *payload);
\r
98 bool Send(int type, int payload_len, void *payload, unsigned char const *meid);
\r
99 bool Send(rmr_mbuf_t * rmr_tx_message);
\r
100 void * get_rmr_context(void);
\r
101 void set_num_retries(int );
\r
102 int get_num_retries(void );
\r
103 unsigned long get_Send_attempts(void);
\r
104 unsigned long get_Send_fails(void);
\r
111 void _error_handler(rmr_mbuf_t *); // pass through placeholder
\r
113 template<typename messageProcessor, typename errorHandler>
\r
114 void _workThread(messageProcessor &&, errorHandler &&, XaPP *);
\r
116 char _xapp_name[XAPP_NAME_LENGTH];
\r
117 char _proto_port[PROTO_PORT_LENGTH];
\r
124 unsigned int _num_threads;
\r
125 unsigned long _num_attempts;
\r
126 unsigned long _num_fails;
\r
130 std::mutex *_transmit;
\r
131 std::map <unsigned int, std::thread> thread_group;
\r
132 rmr_mbuf_t * _rmr_tx_message;
\r
134 bool _isRunning(void);
\r
139 template <typename messageProcessor, typename errorHandler>
\r
140 void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){
\r
143 // Get the thread id
\r
144 std::thread::id my_id = std::this_thread::get_id();
\r
145 std::stringstream thread_id;
\r
146 thread_id << my_id;
\r
149 unsigned long recvs = 0;
\r
150 unsigned long attempts = 0;
\r
151 unsigned long fails = 0;
\r
153 // Get the rmr context
\r
154 void *rmr_context = parent->get_rmr_context();
\r
156 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
\r
157 std::string error_string = identifier + " Thread : " + thread_id.str() + " Listener cannot run : no context available";
\r
158 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
\r
159 throw error_string;
\r
162 // Get buffer specific to this thread
\r
163 rmr_mbuf_t *rmr_message;
\r
164 if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){
\r
165 // throw exception here ..
\r
166 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
\r
167 std::string reason = strerror(errno);
\r
168 std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason;
\r
169 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
\r
170 throw error_string;
\r
173 int num_retries = this->get_num_retries();
\r
175 mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str());
\r
177 while(parent->_isRunning()){
\r
179 rmr_message = rmr_torcv_msg(rmr_context, rmr_message, RMR_TIMEOUT);
\r
180 //rmr_message = rmr_rcv_msg(rmr_context, rmr_message);
\r
182 if (rmr_message && rmr_message->state == RMR_OK){
\r
184 bool res = msg_fn(c,rmr_message);
\r
191 rmr_free_msg(rmr_message);
\r
193 catch(std::runtime_error &e){
\r
194 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
\r
195 std::string error_string = identifier = " Error freeing RMR message ";
\r
196 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
\r
199 mdclog_write(MDCLOG_INFO, "Finished thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);
\r
203 template <typename messageProcessor>
\r
204 void XaPP::Start(messageProcessor && msg_fn){
\r
206 std::lock_guard<std::mutex> guard(*_transmit);
\r
209 // Spin up the the workThreads .....
\r
210 for(unsigned int i = 0; i < _num_threads; i++){
\r
211 thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
\r
216 // template if calling function provides an error handler also
\r
217 template <typename messageProcessor, typename errorHandler>
\r
218 void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){
\r
220 std::lock_guard<std::mutex> guard(*_transmit);
\r
223 // Spin up the the workThreads .....
\r
224 for(unsigned int i = 0; i < _num_threads; i++){
\r
225 //std::cout << "Starting thread number " << i << std::endl;
\r
226 thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));
\r
233 // Template to allow a user defined processor to start
\r
234 // on a specific thread
\r
235 template <typename messageProcessor>
\r
236 unsigned int XaPP::StartThread(messageProcessor && msg_fn){
\r
238 std::lock_guard<std::mutex> guard(*_transmit);
\r
242 thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
\r
243 return _num_threads;
\r
248 // Template to allow a user defined processor and error handler to start
\r
249 // on a specific thread
\r
250 template <typename messageProcessor, typename errorHandler>
\r
251 unsigned int XaPP::StartThread(messageProcessor && msg_fn, errorHandler && error_handler){
\r
253 std::lock_guard<std::mutex> guard(*_transmit);
\r
257 thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));
\r
258 return _num_threads;
\r