Initial commit of Admission Control xAPP and E2AP/X2AP definitions
[ric-app/admin.git] / src / xapp_utils.hpp
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 #include <iostream>
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <error.h>
25 #include <thread>
26 #include <map>
27 #include <mutex>
28 #include <sys/epoll.h>
29 #include <functional>
30 #include <sstream>
31 #include <rmr/rmr.h>
32 #include <rmr/RIC_message_types.h>
33 #include <mdclog/mdclog.h>
34
35
36 #ifndef XAPP_UTIL
37 # define XAPP_UTIL
38
39 #define DEBUG 0
40
41 #define XAPP_NAME_LENGTH 128
42 #define PROTO_PORT_LENGTH 16
43 #define MAX_RETRIES 16
44 #define MAX_WAIT_TIME 10
45 #define EPOLL_TIMEOUT 500  //in milli-seconds
46 #define RMR_TIMEOUT 50// in mill-seconds
47 #define RMR_BUFFER_SIZE 16384
48 #define MAX_THREADS 8
49
50 #ifdef __GNUC__
51 #define likely(x)  __builtin_expect((x), 1)
52 #define unlikely(x) __builtin_expect((x), 0)
53 #else
54 #define likely(x) (x)
55 #define unlikely(x) (x)
56 #endif
57
58 void init_logger(const char  *AppName, mdclog_severity_t log_level);
59
60
61 class XaPP {
62   
63  public:
64
65   XaPP(char *, char *, int);
66   XaPP(char *, char *, int, int);
67   ~XaPP(void);
68   XaPP(XaPP &&) = default;  // destructor freaks out with non-copyable thread otherwise ..
69   std::string getName(void);
70   int  getStatus(void);
71
72   // ideally can reduce tempate definitions to just two
73   // but for now leaving it open ...
74   
75   // template definition to allow a user defined
76   // processor to be started in multiple threads
77   template <typename messageProcessor>
78   void Start(messageProcessor &&);
79
80   // template definition to allow a user defined
81   // processor and  error handler if a send fails 
82   // to be started in multiple threads
83   template <typename messageProcessor, typename errorHandler>
84   void Start(messageProcessor &&, errorHandler &&);
85
86   // Template to allow a user defined processor to start
87   // on a single thread each time it is invoked
88   template <typename messageProcessor > 
89   unsigned int  StartThread(messageProcessor &&);
90
91   // Template to allow a user defined processor and 
92   // error handle to start // on a single thread each time it
93   // is invoked
94   template <typename messageProcessor , typename errorHandler> 
95   unsigned int StartThread(messageProcessor &&, errorHandler &&);
96
97   void Stop(void);
98   bool Send(int type,  int payload_len, void *payload);
99   bool Send(int type, int payload_len, void *payload, unsigned char const  *meid);
100   bool Send(rmr_mbuf_t * rmr_tx_message);
101   void * get_rmr_context(void);
102   void set_num_retries(int );
103   int  get_num_retries(void );
104   unsigned long get_Send_attempts(void);
105   unsigned long get_Send_fails(void);
106   
107 private:
108
109   void init(int);
110   void get_routes();
111   void _error_handler(rmr_mbuf_t *); // pass through placeholder
112   
113   template<typename messageProcessor, typename errorHandler>
114   void _workThread(messageProcessor &&, errorHandler &&, XaPP *);
115
116   char _xapp_name[XAPP_NAME_LENGTH];
117   char _proto_port[PROTO_PORT_LENGTH];
118
119   int _is_ready;
120   bool _listen;
121   int _num_retries;
122   int _msg_size;
123   unsigned int _num_threads;
124   unsigned long _num_attempts;
125   unsigned long _num_fails;
126
127   void* _rmr_ctx;
128   std::mutex *_transmit;
129   std::map <unsigned int, std::thread> thread_group;
130   rmr_mbuf_t * _rmr_tx_message;
131  
132   bool _isRunning(void);
133   
134 };
135
136
137 template <typename messageProcessor, typename errorHandler>
138 void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){
139
140
141   // Get the thread id 
142   std::thread::id my_id = std::this_thread::get_id();
143   std::stringstream thread_id;
144   thread_id << my_id;
145
146   // Stats counters 
147   unsigned long recvs = 0;
148   unsigned long attempts = 0;
149   unsigned long fails = 0;
150  
151   // Get the rmr context 
152   void *rmr_context = parent->get_rmr_context(); 
153   if (!rmr_context){
154     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
155     std::string error_string = identifier +  " Thread : " + thread_id.str() + " Listener cannot run : no context available";
156     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
157     throw error_string;
158   }
159
160   // Get buffer specific to this thread
161   rmr_mbuf_t *rmr_message;
162   if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){
163     // throw exception here ..
164     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
165     std::string reason = strerror(errno);
166     std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason;
167     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
168     throw error_string;
169   }
170
171
172   // Create an epoll instance
173   int rcv_fd, ep_fd;
174   struct epoll_event eve, trigger;
175   if( (rcv_fd = rmr_get_rcvfd(rmr_context)) < 0){
176     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ;
177     std::string reason = strerror(errno);
178     std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a receive file descriptor : " + reason;
179     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
180     throw error_string;
181   }
182
183   if( (ep_fd = epoll_create1(0) ) < 0){
184     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ;
185     std::string reason = strerror(errno);
186     std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting an epoll  file descriptor :" + reason;
187     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
188     throw error_string;
189   }
190
191   trigger.events = EPOLLIN|EPOLLET|EPOLLONESHOT;
192   trigger.data.fd = rcv_fd; 
193
194   if (epoll_ctl (ep_fd, EPOLL_CTL_ADD, rcv_fd, &trigger) < 0){
195     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ;
196     std::string reason = strerror(errno);
197     std::string error_string = identifier + " Thread: " + thread_id.str() + " Error registering  epoll  file descriptor : " + reason;
198     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
199     throw error_string;
200   }
201
202   
203   int num_retries = this->get_num_retries();
204   int i = 0;
205   int num_fds = 0;
206   bool send_ok;
207   
208   mdclog_write(MDCLOG_INFO, "Starting thread %s",  thread_id.str().c_str());
209   
210   while(parent->_isRunning()){
211     num_fds = epoll_wait(ep_fd, &eve, 1, EPOLL_TIMEOUT);
212     
213     if(num_fds && eve.data.fd == rcv_fd){
214       rmr_message = rmr_torcv_msg(rmr_context, rmr_message, RMR_TIMEOUT);
215       //rmr_message = rmr_rcv_msg(rmr_context, rmr_message);
216       
217       // Re-arm the trigger
218       if (epoll_ctl (ep_fd, EPOLL_CTL_MOD, rcv_fd, &trigger) < 0){
219         std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ;
220         std::string reason = strerror(errno);
221         std::string error_string = identifier + " Thread: " + thread_id.str() + " Error re-arming epoll : " + reason;
222         mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
223         throw error_string;
224       }
225     
226     }
227     else{
228       continue;
229     };
230   
231     if (rmr_message && rmr_message->state == RMR_OK){
232
233       recvs++;
234       bool res = msg_fn(rmr_message);
235
236       // is there anything to send 
237       if (res && rmr_message != NULL && likely (rmr_message->len > 0 && rmr_message->len <= RMR_BUFFER_SIZE)){
238         i = 0;
239         rmr_message->sub_id = RMR_VOID_SUBID;
240         send_ok = false;
241         
242         while(i < num_retries){
243           
244           // Need to handle differently depending on whether message
245           // is for A1 (determined by type) or non-A1.
246           // For now, A1 requires we bypass the routing table and send
247           // directly back to originator using rmr_rts_msg rather than
248           // over the bus
249
250           if (unlikely(rmr_message->mtype == DC_ADM_INT_CONTROL_ACK || rmr_message->mtype == DC_ADM_GET_POLICY_ACK)){
251             rmr_message = rmr_rts_msg(rmr_context, rmr_message);
252           }
253           else{
254             rmr_message->state = 0; // fix for nng
255             rmr_message = rmr_send_msg(rmr_context, rmr_message);   
256           }
257           attempts ++;
258
259           if (! rmr_message){
260             // CRITICAL  error. break out of loop
261             std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
262             std::string error_string = identifier + " rmr_send returned NULL";
263             mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
264             break;
265           }
266           else if (rmr_message->state == RMR_OK){
267             send_ok = true;
268             break;
269           }
270           
271           else  if(rmr_message->state == RMR_ERR_RETRY){
272             i++;
273             fails++;
274           }
275           else{
276             mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_message->state, strerror(errno));
277             break;
278           }
279          
280         }
281           
282         if (send_ok == false){
283           error_handler(rmr_message);
284         }
285         
286       }
287       
288     }
289   }
290   
291
292   // Clean up 
293   try{
294     rmr_free_msg(rmr_message);
295   }
296   catch(std::runtime_error &e){
297     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
298     std::string error_string = identifier = " Error freeing RMR message ";
299     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
300   }
301   
302   mdclog_write(MDCLOG_INFO, "Finished  thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);
303 }
304
305
306 template <typename messageProcessor>
307 void XaPP::Start(messageProcessor && msg_fn){
308
309   std::lock_guard<std::mutex> guard(*_transmit);
310   _listen = true;
311
312   // Spin up the the workThreads ..... 
313   for(unsigned int i = 0; i < _num_threads; i++){
314     thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
315   }
316   
317 };
318
319 // template if calling function provides an error handler also
320 template <typename messageProcessor, typename errorHandler>
321 void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){
322
323   std::lock_guard<std::mutex> guard(*_transmit);
324   _listen = true;
325
326   // Spin up the the workThreads ..... 
327   for(unsigned int i = 0; i < _num_threads; i++){
328     //std::cout << "Starting thread number " << i << std::endl;
329     thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));
330
331   }
332
333   
334 };
335
336 // Template to allow a user defined processor to start
337 // on a specific thread 
338 template <typename messageProcessor>
339 unsigned int XaPP::StartThread(messageProcessor && msg_fn){
340
341   std::lock_guard<std::mutex> guard(*_transmit);
342   _listen = true;
343
344   _num_threads++;
345   thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));
346   return _num_threads;
347
348 };
349
350
351 // Template to allow a user defined processor and error handler to start
352 // on a specific thread 
353 template <typename messageProcessor, typename  errorHandler>
354 unsigned int XaPP::StartThread(messageProcessor && msg_fn, errorHandler && error_handler){
355
356   std::lock_guard<std::mutex> guard(*_transmit);
357   _listen = true;
358
359   _num_threads++;
360   thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));
361   return _num_threads;
362
363 };
364
365
366 #endif