Added debugging and fixes for incorrect messages
[ric-app/admin.git] / src / xapp_utils.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*  Author : Ashwin Sridharan
21     Date    : Feb 2019
22 */
23
24
25 #include "xapp_utils.hpp"
26
27 // Constructor that automatically determines number of threads
28 XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_attempts(0), _num_fails(0){
29
30   _num_threads = std::thread::hardware_concurrency();
31   strcpy(_xapp_name, xapp_name);
32   strcpy(_proto_port, proto_port);
33   init(_msg_size);
34   get_routes();
35
36 };
37   
38
39
40 // Constructor that takes number of threads as argument 
41 XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size, int num_threads): _is_ready(0), _listen(false), _num_retries(2),  _msg_size(msg_size), _num_threads(num_threads),_num_attempts(0), _num_fails(0) {
42
43   strcpy(_xapp_name, xapp_name);
44   strcpy(_proto_port, proto_port);
45   init(_msg_size);
46   get_routes();
47
48 };
49   
50 // Destructor closes rmr context if available
51 XaPP::~XaPP(void){
52
53   // Call stop to clear thread table
54   Stop();
55
56   // free memory
57   rmr_free_msg(_rmr_tx_message);
58   
59   if (_rmr_ctx){
60     rmr_close(_rmr_ctx);
61   }
62   // delete mutex
63   delete _transmit;
64 };
65
66 // Get the RMR context 
67 void XaPP::init(int msg_size){
68
69   if (msg_size > RMR_BUFFER_SIZE or msg_size <= 0){
70     std::stringstream ss;
71     ss << "Error ::" << __FILE__ << "," << __LINE__ << " Invalid buffer size  " << msg_size << " for RMR initialization context. Must be between " << 1 << " and " << RMR_BUFFER_SIZE << std::endl;
72     ss << " To change max buffer requested, update RMR_BUFFER_SIZE " << std::endl;
73     mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
74     throw ss.str();
75   }
76   
77   // Initialze the rmr context
78   if ( (_rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE)) == NULL){
79     // throw exception here ..
80     std::stringstream ss;
81     ss << "Error ::" << __FILE__ << "," << __LINE__ << " Error initiatilizing RMR context for " << _xapp_name << " on port " << _proto_port << " Reason = " << strerror(errno) << std::endl;
82     mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
83     throw ss.str();
84   }
85   
86 }
87
88
89 // Blocks till routing table is received
90 void XaPP::get_routes(void){
91   
92   // Wait for getting a routing table ...
93   int i = 0;
94   _is_ready = 0;
95   while( i < MAX_WAIT_TIME){
96     std::cout <<"Waiting for RMR to be ready " << std::endl;
97     if ((_is_ready = rmr_ready(_rmr_ctx)) == 1){
98       break;
99     }
100     sleep(1);
101     i++;
102   };
103
104   if(!_is_ready){
105     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
106     std::string error_string = identifier + " Error getting routing table";
107     throw std::runtime_error(error_string);
108   }
109   
110
111   // Get a tx buffer in case we need to do a transmit from the main thread itself
112   if ( (_rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE)) == NULL){
113     // throw exception here ..
114     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
115     std::string error_string = identifier + " Error getting a send buffer";
116     throw std::runtime_error(error_string);
117   }
118
119   std::cout <<"Route Table received. Send buffer allocated" << std::endl;
120   _transmit = new std::mutex();
121
122 }
123
124 // Send method that takes TLV (type/length/value) input
125 bool XaPP::Send(int type,  int payload_len, void *payload){
126
127   if (likely(_is_ready)){
128     if (likely(payload_len <= RMR_BUFFER_SIZE)){
129       _rmr_tx_message->mtype  = type;
130       memcpy(_rmr_tx_message->payload, payload, payload_len);
131       _rmr_tx_message->len = payload_len;
132       return Send(_rmr_tx_message);
133     }
134     else{
135        std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
136        std::string error_string = identifier + " message payload len " + std::to_string(payload_len) + " exceeds maximum buffer size " + std::to_string(RMR_BUFFER_SIZE);
137        mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
138     }
139   }
140   else{
141     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
142     std::string error_string = identifier + " rmr not ready to send";
143     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
144   }
145   
146   return false;
147 }
148
149 // Send method that takes TLV (type/length/value) input + MEID
150 bool XaPP::Send(int type,  int payload_len, void *payload, unsigned char const * meid){
151   if (!_is_ready){
152     return false;
153   }
154
155   _rmr_tx_message->mtype  = type;  
156   memcpy(_rmr_tx_message->payload, payload, payload_len);
157   _rmr_tx_message->len = payload_len;
158   rmr_str2meid(_rmr_tx_message, meid);
159   return Send(_rmr_tx_message);
160   
161 }
162
163
164 // Send method that takes a buffer
165 bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){
166
167   if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE  && rmr_tx_message->len > 0)){
168     int i = 0;
169     rmr_tx_message->sub_id = RMR_VOID_SUBID;
170     while(i <= _num_retries){
171       
172       //rmr_tx_message->state = 0; // fix for nng
173       rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message);
174       _num_attempts ++;
175       
176       if (! rmr_tx_message){
177         // CRITICAL EROR .. log it 
178         std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
179         std::string error_string = identifier + " rmr_send returned NULL";
180         mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
181         return false;
182       }
183       else if (rmr_tx_message->state == RMR_OK){
184         return true;
185       }
186       else  if(rmr_tx_message->state == RMR_ERR_RETRY){
187         i++;
188         _num_fails++;
189       }
190       else {
191         mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno));
192         return false;
193       }
194     
195     }
196   }
197   else{
198     mdclog_write(MDCLOG_ERR, "Error: %s, %d  Invalid state/message for RMR tx. Ready = %d, Message len = %d\n", __FILE__, __LINE__, _is_ready, rmr_tx_message->len);
199     return false;
200   }
201   
202   std::stringstream ss;
203   ss << "Error ::" << __FILE__ << "," << __LINE__ << " Could not send message of type " << rmr_tx_message->mtype << " and size " << rmr_tx_message->len << ". Reason = " << strerror(errno);
204   mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
205   
206   return false;
207 }
208
209
210
211
212 void XaPP::Stop(void){
213   // Get the mutex lock 
214   std::lock_guard<std::mutex> guard(*_transmit);
215   _listen = false;
216
217   //Wait for all threads to stop
218   for(auto & t: thread_group){
219     std::thread::id my_id = t.second.get_id();
220     std::stringstream thread_id;
221     thread_id << my_id;
222     t.second.join();
223
224   }
225
226   // Clear thread table ...
227   thread_group.clear();
228   
229 }
230
231 // default error handler if non specified by user
232 // pass through for now
233 void XaPP::_error_handler(rmr_mbuf_t *message){
234 };
235
236
237
238
239 //----------------------------------------
240 // Some get/set methods
241 //---------------------------------------
242
243 std::string XaPP::getName(void){
244   return std::string(_xapp_name);
245 }
246
247 int XaPP::getStatus(void){
248   return _is_ready;
249 }
250
251 bool XaPP::_isRunning(void){
252   return _listen;
253 }
254
255
256 void * XaPP::get_rmr_context(void){
257   return _rmr_ctx;
258 }
259
260 void XaPP::set_num_retries(int num_retries){
261   if (num_retries < 0 || num_retries > MAX_RETRIES){
262     throw "[xapp_utils] : Illegal value of num_retries. Must be positive integer between 0 and MAX_RETRIES\n";
263   }
264   
265   _num_retries = num_retries;
266 }
267
268 int XaPP::get_num_retries(void){
269   return _num_retries;
270 }
271
272
273 unsigned long  XaPP::get_Send_attempts(void){
274   return _num_attempts;
275 };
276
277
278 unsigned long XaPP::get_Send_fails(void){
279   return _num_fails;
280 };
281
282
283
284
285 void init_logger(const char  *AppName, mdclog_severity_t log_level)
286 {
287     mdclog_attr_t *attr;
288     mdclog_attr_init(&attr);
289     mdclog_attr_set_ident(attr, AppName);
290     mdclog_init(attr);
291     mdclog_level_set(log_level);
292     mdclog_attr_destroy(attr);
293 }