effbb78d1b482ac51f09564be2d44b4f43186217
[ric-app/admin.git] / src / E2AP-c / subscription / subscription_handler.cc
1 /*
2 ==================================================================================
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23
24 #include <subscription_handler.hpp>
25 #include <errno.h>
26
27
28 subscription_handler::subscription_handler(unsigned int timeout_seconds, unsigned int num_tries):_time_out(std::chrono::seconds(timeout_seconds)), _num_retries(num_tries){
29   init();   
30 };
31
32 void subscription_handler::init(void){
33   
34   _data_lock = std::make_unique<std::mutex>();
35   _cv = std::make_unique<std::condition_variable>();
36   
37 }
38
39 void subscription_handler::clear(void){
40   {
41     std::lock_guard<std::mutex> lock(*(_data_lock).get());
42     requests_table.clear();
43     subscription_responses.clear();
44   }
45   
46 };
47
48 size_t subscription_handler::num_pending(void) const {
49   return requests_table.size();
50 }
51
52 size_t subscription_handler::num_complete(void) const {
53   return subscription_responses.size();
54 }
55
56
57 void subscription_handler::set_timeout(unsigned int timeout_seconds){
58   _time_out = std::chrono::seconds(timeout_seconds);
59 }
60
61 void subscription_handler::set_num_retries(unsigned int num_tries){
62   _num_retries = num_tries;
63 };
64
65
66
67 bool subscription_handler::add_request_entry(subscription_identifier id, int status){
68
69   // add entry in hash table if it does not exist
70   auto search = requests_table.find(id);
71   if(search != requests_table.end()){
72     return false;
73   }
74   
75   requests_table[id] = status;
76   return true;
77
78 };
79
80 bool subscription_handler::set_request_status(subscription_identifier id, int status){
81   
82   // change status of a request only if it exists.
83   auto search = requests_table.find(id);
84   if(search != requests_table.end()){
85     requests_table[id] = status;
86     return true;
87   }
88
89   return false;
90   
91 };
92
93
94 bool subscription_handler::delete_request_entry(subscription_identifier id){
95
96   auto search = requests_table.find(id);
97   if (search != requests_table.end()){
98     requests_table.erase(search);
99     return true;
100   }
101
102   return false;
103 };
104   
105 bool subscription_handler::add_subscription_entry(subscription_identifier id, subscription_response_helper &he){
106
107   auto search = subscription_responses.find(id);
108   if (search == subscription_responses.end()){
109     subscription_responses[id] = he;
110     return true;
111   }
112
113   return false;
114 }
115
116
117 bool subscription_handler::delete_subscription_entry(subscription_identifier id){
118
119   auto search = subscription_responses.find(id);
120   if(search == subscription_responses.end()){
121     return false;
122   }
123   else{
124     subscription_responses.erase(search);
125     return true;
126   }
127   
128 }
129
130 subscription_response_helper *  const subscription_handler::get_subscription(subscription_identifier id){
131   auto search = subscription_responses.find(id);
132   if(search == subscription_responses.end()){
133     return NULL;
134   }
135   else{
136     return &(subscription_responses[id]);
137   }
138 };
139
140
141 // Handles responses from RMR
142 void subscription_handler::Response(int message_type, unsigned char *payload, int payload_length, const char * node_id){
143
144   bool res;
145   std::string node(node_id);
146   int type;
147   int procedureCode;
148   bool valid_response  =false;
149   
150   E2N_E2AP_PDU_t * e2ap_recv;
151   asn_dec_rval_t retval;
152
153   subscription_response sub_resp;
154   subscription_delete_response sub_del_resp;
155
156   subscription_response_helper he_response;
157
158
159   e2ap_recv = 0;
160   retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv), payload, payload_length);
161
162   if(retval.code != RC_OK){
163     mdclog_write(MDCLOG_ERR, "%s, %d: Error decoding E2AP PDU of RMR type %d. Bytes decoded = %lu out of %d\n", __FILE__, __LINE__, message_type, retval.consumed, payload_length);
164     ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv);
165     return ;
166   }
167   
168   type = e2ap_recv->present;
169   mdclog_write(MDCLOG_INFO, "Received RMR message of type = %d", type);
170   
171   if(type == E2N_E2AP_PDU_PR_successfulOutcome){
172     
173     procedureCode =  e2ap_recv->choice.successfulOutcome->procedureCode;
174     mdclog_write(MDCLOG_INFO, "Received E2N_E2AP PDU  successful outcome message with procedureCode = %d", procedureCode);  
175
176     if( procedureCode == E2N_ProcedureCode_id_ricSubscription){  
177       // subscription response
178       // decode the message
179       sub_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
180       {
181         std::lock_guard<std::mutex> lock(*(_data_lock.get()));
182         // get the id
183         subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
184
185         // get status of id 
186         int req_status = get_request_status(id);
187         if (req_status == request_pending ){
188           res = add_subscription_entry(id, he_response);
189           if(res)
190             set_request_status(id, request_success);
191           
192           else{
193             set_request_status(id, request_duplicate);
194             mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d seems to be a duplicate. Subscription already present in subscription table\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
195           }
196           
197           valid_response = true;
198         }
199         else if (req_status > 0){
200           // we don't change status of response since it was not in pending
201           // we simply fail
202           mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s,%d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status);
203           
204         }
205         else{
206           mdclog_write(MDCLOG_ERR,  "%s, %d: Could not find id %s, %d in request queue for subscription", __FILE__, __LINE__,  std::get<0>(id).c_str(), std::get<1>(id));
207         }         
208         
209       }
210       
211     }
212     
213     else if( procedureCode == E2N_ProcedureCode_id_ricSubscriptionDelete){
214       
215       res = sub_del_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
216       {
217         std::lock_guard<std::mutex> lock(*(_data_lock.get()));
218
219         // get the id
220         subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
221         
222         int req_status = get_request_status(id);
223         if (req_status == delete_request_pending ){
224           // Remove the subscription from the table 
225           res = delete_subscription_entry(id);
226           if(res){
227             set_request_status(id, delete_request_success);
228             valid_response = true;
229           }
230           else{
231             set_request_status(id, delete_request_failed);
232             mdclog_write(MDCLOG_ERR,  "%s, %d: Error deleting subscription entry for  %s, %d", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
233             valid_response = true;
234           } 
235         }      
236         else if (req_status > 0){
237           // we don't change status since it was not in pending
238           // we simply fail
239           mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d for deletion  is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, std::get<0>(id).c_str(), std::get<1>(id));
240         }
241         else{
242           mdclog_write(MDCLOG_ERR,  "%s, %d: Could not find request id  %s, %d in request queue for deletion ", __FILE__, __LINE__,  std::get<0>(id).c_str(), std::get<1>(id));
243         }
244
245       }
246     }
247
248     else{
249       mdclog_write(MDCLOG_ERR,  "%s, %d: Subscription Handler Response received E2AP PDU success  response with an non-subscription response related type  %d", __FILE__, __LINE__, procedureCode);
250     }
251     
252   }
253   
254   else if(type == E2N_E2AP_PDU_PR_unsuccessfulOutcome){
255     
256     procedureCode = e2ap_recv->choice.unsuccessfulOutcome->procedureCode;
257     mdclog_write(MDCLOG_INFO, "Received E2AP PDU  unsuccessful outcome message with procedureCode = %d", procedureCode);  
258     
259     if(procedureCode == E2N_ProcedureCode_id_ricSubscription){
260       
261       sub_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
262       {
263         std::lock_guard<std::mutex> lock(*(_data_lock.get()));
264
265         // get the id
266         subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
267
268         int req_status = get_request_status(id);
269         if(req_status == request_pending){
270           set_request_status(id, request_failed);
271           valid_response = true;
272           mdclog_write(MDCLOG_ERR, "Subscription request %d failed", id);
273         }
274         else if (req_status > 0){
275           // we don't changet status since it was not in pending
276           // we simply fail
277           mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status);
278         }
279         else{
280           mdclog_write(MDCLOG_ERR,  "%s, %d: Could not find id %s, %d in request queue for subscription ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
281         }
282       }
283     }
284     
285     else if(procedureCode == E2N_ProcedureCode_id_ricSubscriptionDelete){
286       
287       res = sub_del_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);        
288       {
289         std::lock_guard<std::mutex> lock(*(_data_lock.get()));
290         // get the id
291         subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
292         
293         int req_status = get_request_status(id);
294         if(req_status == delete_request_pending){
295           set_request_status(id, delete_request_failed);
296           mdclog_write(MDCLOG_INFO, "Subscription delete request %s,%d failed", std::get<0>(id).c_str(), std::get<1>(id));
297           valid_response = true;
298         }
299         else if (req_status > 0){
300           mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s,%d for deletion  is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status);
301         }
302         else{
303           mdclog_write(MDCLOG_ERR,  "%s, %d: Could not find id  %s,%d  in request queue for deletion ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
304         }
305         
306       }
307     }
308     else{
309       mdclog_write(MDCLOG_ERR,  "%s, %d: Susbcription Handler Response received E2AP PDU failure response with a non-subscription response related type  %d", __FILE__, __LINE__,  procedureCode);
310
311     }
312   }
313   else{
314     mdclog_write(MDCLOG_ERR,  "%s, %d: Susbcription Handler Response received E2AP PDU with non response type  %d", __FILE__, __LINE__, type);
315   }
316   
317   
318   ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv);
319   
320   // wake up all waiting users ...
321   if(valid_response){
322     _cv.get()->notify_all();
323   }
324   
325 }
326
327
328 int const subscription_handler::get_request_status(subscription_identifier id){
329   auto search = requests_table.find(id);
330   if (search == requests_table.end()){
331     return -1;
332   }
333   
334   return search->second;
335 }
336                                    
337  bool subscription_handler::is_subscription_entry(subscription_identifier id){
338   auto search = subscription_responses.find(id);
339   if (search != subscription_responses.end())
340     return true;
341   else
342     return false;
343 }
344
345 bool subscription_handler::is_request_entry(subscription_identifier id){
346   auto search = requests_table.find(id);
347   if (search != requests_table.end())
348     return true;
349   else
350     return false;
351 }
352
353
354 void subscription_handler::get_subscription_keys(std::vector<subscription_identifier> & key_list){
355   for(auto & e: subscription_responses){
356     key_list.push_back(e.first);
357   }
358 }