Initialization script now passes signal to xapp process
[ric-app/admin.git] / src / xapp_utils.hpp
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 #include <iostream>
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <error.h>
25 #include <assert.h>
26 #include <thread>
27 #include <map>
28 #include <mutex>
29 #include <sys/epoll.h>
30 #include <functional>
31 #include <sstream>
32 #include <rmr/rmr.h>
33 #include <rmr/RIC_message_types.h>
34 #include <mdclog/mdclog.h>
35
36
37 #ifndef XAPP_UTIL
38 # define XAPP_UTIL
39
40
41 #define XAPP_NAME_LENGTH 128
42 #define PROTO_PORT_LENGTH 16
43 #define MAX_RETRIES 16
44 #define MAX_WAIT_TIME 10
45 #define EPOLL_TIMEOUT 500  //in milli-seconds
46 #define RMR_TIMEOUT 50// in mill-seconds
47 #define RMR_BUFFER_SIZE 16384
48 #define MAX_THREADS 8
49
50 #ifdef __GNUC__
51 #define likely(x)  __builtin_expect((x), 1)
52 #define unlikely(x) __builtin_expect((x), 0)
53 #else
54 #define likely(x) (x)
55 #define unlikely(x) (x)
56 #endif
57
58
59 // define RMR Send behaviour for different link-types
60 // controls how often we try and delay between tries, as well as method
61
62 enum  link_types {LOW_LATENCY, HIGH_RELIABILITY};
63 static const unsigned int link_delays[] = {1, 10}; // milli-seconds to wait before retries
64 static const unsigned int link_retries[] = {4, 15}; // number of times to retry
65 enum  tx_types {ROUTE, RTS}; // regular rmr or rts
66
67 void init_logger(const char  *AppName, mdclog_severity_t log_level);
68
69
70 class XaPP {
71   
72  public:
73
74   XaPP(char *, char *, int);
75   ~XaPP(void);
76   XaPP(XaPP &&) = default;  // destructor freaks out with non-copyable thread otherwise ..
77
78   std::string get_name(void);
79
80   int  get_status(void);
81
82   size_t  get_num_active_threads(void) const { return thread_group.size(); };
83   
84   // ideally can reduce tempate definitions to just two
85   // but for now leaving it open ...
86   
87   // Template to allow a user defined processor to start
88   // on a thread 
89   template <typename messageProcessor > 
90   unsigned int  StartThread(messageProcessor &&);
91
92   // Template to allow a user defined processor AND
93   // error handle to start // on a single thread each time it
94   // is invoked
95   template <typename messageProcessor , typename errorHandler> 
96   unsigned int StartThread(messageProcessor &&, errorHandler &&);
97
98   void Stop(void);
99
100   // various flavours of send : first two finally call the last
101   bool Send(int type,  size_t payload_len, void *payload, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
102   bool Send(int type, size_t  payload_len, void *payload, unsigned char const  *meid, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
103   bool Send(rmr_mbuf_t * rmr_tx_message, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE);
104   
105   void * get_rmr_context(void);
106   
107 private:
108
109   void init(int);
110   void get_routes();
111   void _error_handler(rmr_mbuf_t *); // pass through placeholder
112   
113   template<typename messageProcessor, typename errorHandler>
114   void _workThread(messageProcessor &&, errorHandler &&, XaPP *);
115
116   char _xapp_name[XAPP_NAME_LENGTH];
117   char _proto_port[PROTO_PORT_LENGTH];
118
119   int _is_ready;
120   bool _listen;
121   int _num_retries;
122   int _retry_interval;
123   int _msg_size;
124   unsigned int _num_threads;
125
126   void* _rmr_ctx;
127   std::mutex *_transmit;
128   std::map <unsigned int, std::thread> thread_group;
129   rmr_mbuf_t * _rmr_tx_message;
130  
131   bool _isRunning(void);
132   
133 };
134
135 // main workhorse thread which does the listen->process->respond loop 
136 template <typename messageProcessor, typename errorHandler>
137 void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){
138
139
140   // Get the thread id 
141   std::thread::id my_id = std::this_thread::get_id();
142   std::stringstream thread_id;
143   std::stringstream ss;
144   
145   thread_id << my_id;
146
147   // Stats counters 
148   unsigned long recvs = 0;
149   unsigned long attempts = 0;
150   unsigned long fails = 0;
151  
152   // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe)
153   void *rmr_context = parent->get_rmr_context(); 
154   assert(rmr_context != NULL);
155   
156   // Get buffer specific to this thread
157   rmr_mbuf_t *rmr_message = NULL;
158   rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE);
159   assert(rmr_message != NULL);
160
161   // Create an epoll instance
162   int rcv_fd, ep_fd;
163   struct epoll_event eve, trigger;
164   rcv_fd = rmr_get_rcvfd(rmr_context);
165   assert(rcv_fd > 0);
166   
167   ep_fd = epoll_create1(0);
168   assert(ep_fd > 0);
169   
170   trigger.events = EPOLLIN|EPOLLET|EPOLLONESHOT;
171   trigger.data.fd = rcv_fd; 
172
173   if (epoll_ctl (ep_fd, EPOLL_CTL_ADD, rcv_fd, &trigger) < 0){
174     ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error registering epoll file descriptor" << " Reason = " << strerror(errno) << std::endl;
175     mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
176     throw std::runtime_error(ss.str());
177   }
178
179   
180   int num_fds = 0;
181   bool send_ok;
182   
183   mdclog_write(MDCLOG_INFO, "Starting thread %s",  thread_id.str().c_str());
184
185   // the workhorse loop 
186   while(parent->_isRunning()){
187     num_fds = epoll_wait(ep_fd, &eve, 1, EPOLL_TIMEOUT);
188     
189     if(num_fds && eve.data.fd == rcv_fd){
190       rmr_message = rmr_torcv_msg(rmr_context, rmr_message, RMR_TIMEOUT);
191       //rmr_message = rmr_rcv_msg(rmr_context, rmr_message);
192       
193       // Re-arm the trigger
194       if (epoll_ctl (ep_fd, EPOLL_CTL_MOD, rcv_fd, &trigger) < 0){
195         ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error re-arming epoll" << " Reason = " << strerror(errno) << std::endl;
196         mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
197         throw std::runtime_error(ss.str());
198       }
199     
200     }
201     else{
202       continue;
203     };
204   
205     if (rmr_message && rmr_message->state == RMR_OK){
206
207       recvs++;
208       bool res = msg_fn(rmr_message);
209
210       // is there anything to send ?
211       if (res && rmr_message != NULL && likely (rmr_message->len > 0 && rmr_message->len <= RMR_BUFFER_SIZE)){
212
213         rmr_message->sub_id = RMR_VOID_SUBID; // do we change this ? 
214         send_ok = false;
215         
216         if (unlikely(rmr_message->mtype == A1_POLICY_RESP)){
217           // for a1 messages we use send in high reliability mode and RTS
218           send_ok = Send(rmr_message, HIGH_RELIABILITY, RTS);
219         }
220         else{
221           rmr_message->state = 0; // fix for nng
222           send_ok = Send(rmr_message);   
223         }
224         attempts ++;
225         
226         if (send_ok == false){
227           error_handler(rmr_message);
228           fails ++;
229         }
230         
231       }
232       
233     }
234   }
235   
236
237   // Clean up 
238   try{
239     rmr_free_msg(rmr_message);
240   }
241   catch(std::runtime_error &e){
242     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
243     std::string error_string = identifier = " Error freeing RMR message ";
244     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
245   }
246   
247   mdclog_write(MDCLOG_INFO, "Finished  thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);
248 }
249
250 // Template to allow a user defined processor to start
251 // on a specific thread 
252 template <typename messageProcessor>
253 unsigned int XaPP::StartThread(messageProcessor && msg_fn){
254
255   std::lock_guard<std::mutex> guard(*_transmit);
256   _listen = true;
257
258   _num_threads++;
259   thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
260   return _num_threads;
261
262 };
263
264
265 // Template to allow a user defined processor and error handler to start
266 // on a specific thread 
267 template <typename messageProcessor, typename  errorHandler>
268 unsigned int XaPP::StartThread(messageProcessor && msg_fn, errorHandler && error_handler){
269
270   std::lock_guard<std::mutex> guard(*_transmit);
271   _listen = true;
272
273   _num_threads++;
274   thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));
275   return _num_threads;
276
277 };
278
279
280 #endif