Added debugging and fixes for incorrect messages
[ric-app/admin.git] / src / E2AP-c / subscription / subscription_handler.cc
1 /*
2 ==================================================================================
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23
24 #include <subscription_handler.hpp>
25 #include <errno.h>
26
27 SubscriptionHandler::SubscriptionHandler(void){
28
29   init();
30   _time_out =  std::chrono::seconds(5);
31   _num_retries = 2;
32
33   // bool res;
34   // unsigned char buffer[128];
35   // size_t buf_len = 128;
36   
37   // E2AP_PDU_t e2ap_pdu;
38   // subscription_request e2ap_sub_req;
39
40   // int request_id = 2;
41   // int req_seq = 1;
42   // int function_id = 0;
43   // int action_id = 0;
44   // int action_type = 0;
45   // int message_type = 1;
46   
47   // subscription_helper sgnb_add_subscr_req;
48   
49   // //sgnb_add_subscr_req.clear();
50   // sgnb_add_subscr_req.set_request(request_id, req_seq);
51   // sgnb_add_subscr_req.set_function_id(function_id);
52   // sgnb_add_subscr_req.add_action(action_id, action_type);
53   // std::string test = "This is a test";
54   // sgnb_add_subscr_req.set_event_def(test.c_str(), test.length());
55   // std::cout <<"Constructor ........" << std::endl;
56   // // generate the request pdu
57   // res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, &e2ap_pdu, sgnb_add_subscr_req);
58   // if(! res){
59   //   mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
60    
61   // }
62   // std::cout <<"Encoded subscription request pdu " << std::endl;
63  
64  
65 }
66
67 SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds, unsigned int num_tries):_time_out(std::chrono::seconds(timeout_seconds)), _num_retries(num_tries){
68   init();
69
70    
71 };
72
73 void SubscriptionHandler::init(void){
74   
75   _data_lock = std::make_unique<std::mutex>();
76   _cv = std::make_unique<std::condition_variable>();
77   
78 }
79
80 void SubscriptionHandler::clear(void){
81   {
82     std::lock_guard<std::mutex> lock(*(_data_lock).get());
83     requests_table.clear();
84     subscription_responses.clear();
85   }
86   
87 };
88
89 size_t SubscriptionHandler::num_pending(void) const {
90   return requests_table.size();
91 }
92
93 size_t SubscriptionHandler::num_complete(void) const {
94   return subscription_responses.size();
95 }
96
97
98 void SubscriptionHandler::set_timeout(unsigned int timeout_seconds){
99   _time_out = std::chrono::seconds(timeout_seconds);
100 }
101
102 void SubscriptionHandler::set_timeout_flag(bool val){
103   _time_out_flag = val;
104 }
105
106 void SubscriptionHandler::set_num_retries(unsigned int num_tries){
107   _num_retries = num_tries;
108 };
109
110
111 unsigned int SubscriptionHandler::get_next_id(void){
112   std::lock_guard<std::mutex> lock(*(_data_lock).get());
113   unique_request_id ++;
114   return unique_request_id;
115 }
116
117 bool SubscriptionHandler::add_request_entry(int id, int status){
118
119   // add entry in hash table if it does not exist
120   auto search = requests_table.find(id);
121   if(search != requests_table.end()){
122     return false;
123   }
124   
125   requests_table[id] = status;
126   return true;
127
128 };
129
130 bool SubscriptionHandler::set_request_status(int id, int status){
131   
132   // change status of a request only if it exists.
133
134   auto search = requests_table.find(id);
135   if(search != requests_table.end()){
136     requests_table[id] = status;
137     return true;
138   }
139
140   return false;
141   
142 };
143
144
145 bool SubscriptionHandler::delete_request_entry(int id){
146
147   auto search = requests_table.find(id);
148   if (search != requests_table.end()){
149     requests_table.erase(search);
150     return true;
151   }
152
153   return false;
154 };
155   
156 bool SubscriptionHandler::add_subscription_entry(int id, subscription_response_helper &he){
157
158   auto search = subscription_responses.find(id);
159   if (search == subscription_responses.end()){
160     subscription_responses[id] = he;
161     return true;
162   }
163
164   return false;
165 }
166
167
168 bool SubscriptionHandler::delete_subscription_entry(int id){
169
170   auto search = subscription_responses.find(id);
171   if(search == subscription_responses.end()){
172     return false;
173   }
174   else{
175     subscription_responses.erase(search);
176     return true;
177   }
178   
179 }
180
181 subscription_response_helper *  const SubscriptionHandler::get_subscription(int id){
182   auto search = subscription_responses.find(id);
183   if(search == subscription_responses.end()){
184     return NULL;
185   }
186   else{
187     return &(subscription_responses[id]);
188   }
189 };
190
191
192 // Handles responses from RMR
193 void SubscriptionHandler::Response(int message_type, unsigned char *payload, int payload_length){
194
195   bool res;
196   int id;
197   int type;
198   int procedureCode;
199   bool valid_response  =false;
200   
201   E2AP_PDU_t * e2ap_recv;
202   asn_dec_rval_t retval;
203
204   subscription_response sub_resp;
205   subscription_delete_response sub_del_resp;
206
207   subscription_response_helper he_response;
208
209   char buf[512];
210   size_t buf_size = 512;
211
212   e2ap_recv = 0;
213   retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void**)&(e2ap_recv), payload, payload_length);
214
215   if(retval.code != RC_OK){
216     mdclog_write(MDCLOG_ERR, "%s, %d: Error decoding E2AP PDU of RMR type %d. Bytes decoded = %lu out of %d\n", __FILE__, __LINE__, message_type, retval.consumed, payload_length);
217     ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv);
218     return ;
219   }
220   
221   type = e2ap_recv->present;
222   mdclog_write(MDCLOG_INFO, "Received RMR message of type = %d", type);
223   
224   if(type == E2AP_PDU_PR_successfulOutcome){
225     
226     procedureCode =  e2ap_recv->choice.successfulOutcome->procedureCode;
227     mdclog_write(MDCLOG_INFO, "Received E2AP PDU  successful outcome message with procedureCode = %d", procedureCode);  
228
229     if( procedureCode == ProcedureCode_id_ricSubscription){  
230       // subscription response
231       // decode the message
232       sub_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
233
234       {
235         std::lock_guard<std::mutex> lock(*(_data_lock.get()));
236         // get the id
237         id = he_response.get_request_id();
238         // get status of id 
239         int req_status = get_request_status(id);
240         if (req_status == request_pending ){
241           res = add_subscription_entry(id, he_response);
242           if(res)
243             set_request_status(id, request_success);
244           
245           else{
246             set_request_status(id, request_duplicate);
247             mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d seems to be a duplicate\n", __FILE__, __LINE__, id);
248           }
249           
250           valid_response = true;
251         }
252         else if (req_status > 0){
253           // we don't change status of response since it was not in pending
254           // we simply fail
255           mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
256           
257         }
258         else{
259           mdclog_write(MDCLOG_ERR,  "%s, %d: Could not find id %d in request queue for subscription", __FILE__, __LINE__,  id);
260         }         
261         
262       }
263       
264     }
265     
266     else if( procedureCode == ProcedureCode_id_ricSubscriptionDelete){
267       
268       res = sub_del_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
269       {
270         std::lock_guard<std::mutex> lock(*(_data_lock.get()));
271         // get the id
272         id = he_response.get_request_id();
273         int req_status = get_request_status(id);
274         if (req_status == delete_request_pending ){
275           // Remove the subscription from the table 
276           res = delete_subscription_entry(id);
277           if(res){
278             set_request_status(id, delete_request_success);
279             valid_response = true;
280           }
281           else{
282             set_request_status(id, delete_request_failed);
283             std::string error_string = "Error deleting subscription entry for id = ";
284             mdclog_write(MDCLOG_ERR,  "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), id);
285             valid_response = true;
286           } 
287         }      
288         else if (req_status > 0){
289           // we don't change status since it was not in pending
290           // we simply fail
291           mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d for deletion  is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
292         }
293         else{
294           mdclog_write(MDCLOG_ERR,  "%s, %d: Could not find id  %d in request queue for deletion ", __FILE__, __LINE__,  id);
295         }
296
297       }
298     }
299
300     else{
301       mdclog_write(MDCLOG_ERR,  "%s, %d: Subscription Handler Response received E2AP PDU success  response with an non-subscription response related type  %d", __FILE__, __LINE__, procedureCode);
302     }
303     
304   }
305   
306   else if(type == E2AP_PDU_PR_unsuccessfulOutcome){
307     
308     procedureCode = e2ap_recv->choice.unsuccessfulOutcome->procedureCode;
309     mdclog_write(MDCLOG_INFO, "Received E2AP PDU  unsuccessful outcome message with procedureCode = %d", procedureCode);  
310     
311     if(procedureCode == ProcedureCode_id_ricSubscription){
312       
313       sub_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
314       {
315         std::lock_guard<std::mutex> lock(*(_data_lock.get()));  
316         id = he_response.get_request_id();
317         int req_status = get_request_status(id);
318         if(req_status == request_pending){
319           set_request_status(id, request_failed);
320           valid_response = true;
321           mdclog_write(MDCLOG_ERR, "Subscription request %d failed", id);
322         }
323         else if (req_status > 0){
324           // we don't changet status since it was not in pending
325           // we simply fail
326           mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
327         }
328         else{
329           mdclog_write(MDCLOG_ERR,  "%s, %d: Could not find id   %d in request queue for subscription ", __FILE__, __LINE__, id);
330         }
331       }
332     }
333     
334     else if(procedureCode == ProcedureCode_id_ricSubscriptionDelete){
335       
336       res = sub_del_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);        
337       {
338         std::lock_guard<std::mutex> lock(*(_data_lock.get()));
339         // get the id
340         id = he_response.get_request_id();
341         int req_status = get_request_status(id);
342         if(req_status == delete_request_pending){
343           set_request_status(id, delete_request_failed);
344           mdclog_write(MDCLOG_INFO, "Subscription delete request %d failed", id);
345           valid_response = true;
346         }
347         else if (req_status > 0){
348           mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d for deletion  is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
349         }
350         else{
351           mdclog_write(MDCLOG_ERR,  "%s, %d: Could not find id  %d  in request queue for deletion ", __FILE__, __LINE__, id);
352         }
353         
354       }
355     }
356     else{
357       mdclog_write(MDCLOG_ERR,  "%s, %d: Susbcription Handler Response received E2AP PDU failure response with a non-subscription response related type  %d", __FILE__, __LINE__,  procedureCode);
358
359     }
360   }
361   else{
362     mdclog_write(MDCLOG_ERR,  "%s, %d: Susbcription Handler Response received E2AP PDU with non response type  %d", __FILE__, __LINE__, type);
363   }
364   
365   
366   ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv);
367   
368   // wake up all waiting users ...
369   if(valid_response){
370     _cv.get()->notify_all();
371   }
372   
373 }
374
375
376 int const SubscriptionHandler::get_request_status(int id){
377   auto search = requests_table.find(id);
378   if (search == requests_table.end()){
379     return -1;
380   }
381   
382   return search->second;
383 }
384                                    
385  bool SubscriptionHandler::is_subscription_entry(int id){
386   auto search = subscription_responses.find(id);
387   if (search != subscription_responses.end())
388     return true;
389   else
390     return false;
391 }
392
393 bool SubscriptionHandler::is_request_entry(int id){
394   auto search = requests_table.find(id);
395   if (search != requests_table.end())
396     return true;
397   else
398     return false;
399 }