2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
19 /* Author : Ashwin Sridharan
24 #include <subscription_handler.hpp>
27 SubscriptionHandler::SubscriptionHandler(void){
30 _time_out = std::chrono::seconds(5);
34 // unsigned char buffer[128];
35 // size_t buf_len = 128;
37 // E2AP_PDU_t e2ap_pdu;
38 // subscription_request e2ap_sub_req;
40 // int request_id = 2;
42 // int function_id = 0;
44 // int action_type = 0;
45 // int message_type = 1;
47 // subscription_helper sgnb_add_subscr_req;
49 // //sgnb_add_subscr_req.clear();
50 // sgnb_add_subscr_req.set_request(request_id, req_seq);
51 // sgnb_add_subscr_req.set_function_id(function_id);
52 // sgnb_add_subscr_req.add_action(action_id, action_type);
53 // std::string test = "This is a test";
54 // sgnb_add_subscr_req.set_event_def(test.c_str(), test.length());
55 // std::cout <<"Constructor ........" << std::endl;
56 // // generate the request pdu
57 // res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, &e2ap_pdu, sgnb_add_subscr_req);
59 // mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
62 // std::cout <<"Encoded subscription request pdu " << std::endl;
67 SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds, unsigned int num_tries):_time_out(std::chrono::seconds(timeout_seconds)), _num_retries(num_tries){
73 void SubscriptionHandler::init(void){
75 _data_lock = std::make_unique<std::mutex>();
76 _cv = std::make_unique<std::condition_variable>();
80 void SubscriptionHandler::clear(void){
82 std::lock_guard<std::mutex> lock(*(_data_lock).get());
83 requests_table.clear();
84 subscription_responses.clear();
89 size_t SubscriptionHandler::num_pending(void) const {
90 return requests_table.size();
93 size_t SubscriptionHandler::num_complete(void) const {
94 return subscription_responses.size();
98 void SubscriptionHandler::set_timeout(unsigned int timeout_seconds){
99 _time_out = std::chrono::seconds(timeout_seconds);
102 void SubscriptionHandler::set_timeout_flag(bool val){
103 _time_out_flag = val;
106 void SubscriptionHandler::set_num_retries(unsigned int num_tries){
107 _num_retries = num_tries;
111 unsigned int SubscriptionHandler::get_next_id(void){
112 std::lock_guard<std::mutex> lock(*(_data_lock).get());
113 unique_request_id ++;
114 return unique_request_id;
117 bool SubscriptionHandler::add_request_entry(int id, int status){
119 // add entry in hash table if it does not exist
120 auto search = requests_table.find(id);
121 if(search != requests_table.end()){
125 requests_table[id] = status;
130 bool SubscriptionHandler::set_request_status(int id, int status){
132 // change status of a request only if it exists.
134 auto search = requests_table.find(id);
135 if(search != requests_table.end()){
136 requests_table[id] = status;
145 bool SubscriptionHandler::delete_request_entry(int id){
147 auto search = requests_table.find(id);
148 if (search != requests_table.end()){
149 requests_table.erase(search);
156 bool SubscriptionHandler::add_subscription_entry(int id, subscription_response_helper &he){
158 auto search = subscription_responses.find(id);
159 if (search == subscription_responses.end()){
160 subscription_responses[id] = he;
168 bool SubscriptionHandler::delete_subscription_entry(int id){
170 auto search = subscription_responses.find(id);
171 if(search == subscription_responses.end()){
175 subscription_responses.erase(search);
181 subscription_response_helper * const SubscriptionHandler::get_subscription(int id){
182 auto search = subscription_responses.find(id);
183 if(search == subscription_responses.end()){
187 return &(subscription_responses[id]);
192 // Handles responses from RMR
193 void SubscriptionHandler::Response(int message_type, unsigned char *payload, int payload_length){
199 bool valid_response =false;
201 E2AP_PDU_t * e2ap_recv;
202 asn_dec_rval_t retval;
204 subscription_response sub_resp;
205 subscription_delete_response sub_del_resp;
207 subscription_response_helper he_response;
210 size_t buf_size = 512;
213 retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void**)&(e2ap_recv), payload, payload_length);
215 if(retval.code != RC_OK){
216 mdclog_write(MDCLOG_ERR, "%s, %d: Error decoding E2AP PDU of RMR type %d. Bytes decoded = %lu out of %d\n", __FILE__, __LINE__, message_type, retval.consumed, payload_length);
220 type = e2ap_recv->present;
221 mdclog_write(MDCLOG_INFO, "Received RMR message of type = %d", type);
223 if(type == E2AP_PDU_PR_successfulOutcome){
225 procedureCode = e2ap_recv->choice.successfulOutcome->procedureCode;
226 mdclog_write(MDCLOG_INFO, "Received E2AP PDU successful outcome message with procedureCode = %d", procedureCode);
228 if( procedureCode == ProcedureCode_id_ricSubscription){
229 // subscription response
230 // decode the message
231 sub_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
233 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
235 id = he_response.get_request_id();
236 if (get_request_status(id) == request_pending ){
237 res = add_subscription_entry(id, he_response);
239 set_request_status(id, request_success);
242 set_request_status(id, request_duplicate);
244 valid_response = true;
247 std::string error_string = "Could not find id to match response = ";
248 mdclog_write(MDCLOG_ERR, "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), id);
255 else if( procedureCode == ProcedureCode_id_ricSubscriptionDelete){
257 res = sub_del_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
259 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
261 id = he_response.get_request_id();
262 if (get_request_status(id) == delete_request_pending ){
263 // Remove the subscription from the table
264 res = delete_subscription_entry(id);
266 set_request_status(id, delete_request_success);
267 valid_response = true;
270 std::string error_string = "Error deleting subscription entry for id = ";
271 mdclog_write(MDCLOG_ERR, "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), id);
276 std::string error_string = "Could not find id for deletion = ";
277 mdclog_write(MDCLOG_ERR, "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), id);
284 std::string error_string = "Handler received E2AP subscription message with Unknown procedure code =" ;
285 mdclog_write(MDCLOG_ERR, "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), procedureCode);
290 else if(type == E2AP_PDU_PR_unsuccessfulOutcome){
292 procedureCode = e2ap_recv->choice.unsuccessfulOutcome->procedureCode;
293 mdclog_write(MDCLOG_INFO, "Received E2AP PDU unsuccessful outcome message with procedureCode = %d", procedureCode);
295 if(procedureCode == ProcedureCode_id_ricSubscription){
297 sub_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
299 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
300 id = he_response.get_request_id();
302 if(get_request_status(id) == request_pending){
303 set_request_status(id, request_failed);
304 valid_response = true;
305 mdclog_write(MDCLOG_INFO, "Subscription request %d failed", id);
308 std::string error_string = "Could not find id to match response = ";
309 mdclog_write(MDCLOG_ERR, "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), id);
314 else if(procedureCode == ProcedureCode_id_ricSubscriptionDelete){
316 res = sub_del_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
318 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
320 id = he_response.get_request_id();
321 if(get_request_status(id) == delete_request_pending){
322 set_request_status(id, delete_request_failed);
323 mdclog_write(MDCLOG_INFO, "Subscription delete request %d failed", id);
324 valid_response = true;
327 std::string error_string = "Could not find id to match response = ";
328 mdclog_write(MDCLOG_ERR, "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), id);
336 std::string error_string = "Handler received E2AP subscription message with Unknown procedure code =" ;
337 mdclog_write(MDCLOG_ERR, "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), procedureCode);
341 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv);
343 // wake up all waiting users ...
345 _cv.get()->notify_all();
351 int const SubscriptionHandler::get_request_status(int id){
352 auto search = requests_table.find(id);
353 if (search == requests_table.end()){
357 return search->second;
360 bool SubscriptionHandler::is_subscription_entry(int id){
361 auto search = subscription_responses.find(id);
362 if (search != subscription_responses.end())
368 bool SubscriptionHandler::is_request_entry(int id){
369 auto search = requests_table.find(id);
370 if (search != requests_table.end())