2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
19 /* Author : Ashwin Sridharan
24 #include <subscription_handler.hpp>
27 SubscriptionHandler::SubscriptionHandler(void){
30 _time_out = std::chrono::seconds(5);
34 // unsigned char buffer[128];
35 // size_t buf_len = 128;
37 // E2AP_PDU_t e2ap_pdu;
38 // subscription_request e2ap_sub_req;
40 // int request_id = 2;
42 // int function_id = 0;
44 // int action_type = 0;
45 // int message_type = 1;
47 // subscription_helper sgnb_add_subscr_req;
49 // //sgnb_add_subscr_req.clear();
50 // sgnb_add_subscr_req.set_request(request_id, req_seq);
51 // sgnb_add_subscr_req.set_function_id(function_id);
52 // sgnb_add_subscr_req.add_action(action_id, action_type);
53 // std::string test = "This is a test";
54 // sgnb_add_subscr_req.set_event_def(test.c_str(), test.length());
55 // std::cout <<"Constructor ........" << std::endl;
56 // // generate the request pdu
57 // res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, &e2ap_pdu, sgnb_add_subscr_req);
59 // mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
62 // std::cout <<"Encoded subscription request pdu " << std::endl;
67 SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds, unsigned int num_tries):_time_out(std::chrono::seconds(timeout_seconds)), _num_retries(num_tries){
73 void SubscriptionHandler::init(void){
75 _data_lock = std::make_unique<std::mutex>();
76 _cv = std::make_unique<std::condition_variable>();
80 void SubscriptionHandler::clear(void){
82 std::lock_guard<std::mutex> lock(*(_data_lock).get());
83 requests_table.clear();
84 subscription_responses.clear();
89 size_t SubscriptionHandler::num_pending(void) const {
90 return requests_table.size();
93 size_t SubscriptionHandler::num_complete(void) const {
94 return subscription_responses.size();
98 void SubscriptionHandler::set_timeout(unsigned int timeout_seconds){
99 _time_out = std::chrono::seconds(timeout_seconds);
102 void SubscriptionHandler::set_timeout_flag(bool val){
103 _time_out_flag = val;
106 void SubscriptionHandler::set_num_retries(unsigned int num_tries){
107 _num_retries = num_tries;
111 unsigned int SubscriptionHandler::get_next_id(void){
112 std::lock_guard<std::mutex> lock(*(_data_lock).get());
113 unique_request_id ++;
114 return unique_request_id;
117 bool SubscriptionHandler::add_request_entry(int id, int status){
119 // add entry in hash table if it does not exist
120 auto search = requests_table.find(id);
121 if(search != requests_table.end()){
125 requests_table[id] = status;
130 bool SubscriptionHandler::set_request_status(int id, int status){
132 // change status of a request only if it exists.
134 auto search = requests_table.find(id);
135 if(search != requests_table.end()){
136 requests_table[id] = status;
145 bool SubscriptionHandler::delete_request_entry(int id){
147 auto search = requests_table.find(id);
148 if (search != requests_table.end()){
149 requests_table.erase(search);
156 bool SubscriptionHandler::add_subscription_entry(int id, subscription_response_helper &he){
158 auto search = subscription_responses.find(id);
159 if (search == subscription_responses.end()){
160 subscription_responses[id] = he;
168 bool SubscriptionHandler::delete_subscription_entry(int id){
170 auto search = subscription_responses.find(id);
171 if(search == subscription_responses.end()){
175 subscription_responses.erase(search);
181 subscription_response_helper * const SubscriptionHandler::get_subscription(int id){
182 auto search = subscription_responses.find(id);
183 if(search == subscription_responses.end()){
187 return &(subscription_responses[id]);
192 // Handles responses from RMR
193 void SubscriptionHandler::Response(int message_type, unsigned char *payload, int payload_length){
199 bool valid_response =false;
201 E2AP_PDU_t * e2ap_recv;
202 asn_dec_rval_t retval;
204 subscription_response sub_resp;
205 subscription_delete_response sub_del_resp;
207 subscription_response_helper he_response;
210 size_t buf_size = 512;
213 retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void**)&(e2ap_recv), payload, payload_length);
215 if(retval.code != RC_OK){
216 mdclog_write(MDCLOG_ERR, "%s, %d: Error decoding E2AP PDU of RMR type %d. Bytes decoded = %lu out of %d\n", __FILE__, __LINE__, message_type, retval.consumed, payload_length);
217 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv);
221 type = e2ap_recv->present;
222 mdclog_write(MDCLOG_INFO, "Received RMR message of type = %d", type);
224 if(type == E2AP_PDU_PR_successfulOutcome){
226 procedureCode = e2ap_recv->choice.successfulOutcome->procedureCode;
227 mdclog_write(MDCLOG_INFO, "Received E2AP PDU successful outcome message with procedureCode = %d", procedureCode);
229 if( procedureCode == ProcedureCode_id_ricSubscription){
230 // subscription response
231 // decode the message
232 sub_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
235 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
237 id = he_response.get_request_id();
239 int req_status = get_request_status(id);
240 if (req_status == request_pending ){
241 res = add_subscription_entry(id, he_response);
243 set_request_status(id, request_success);
246 set_request_status(id, request_duplicate);
247 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d seems to be a duplicate\n", __FILE__, __LINE__, id);
250 valid_response = true;
252 else if (req_status > 0){
253 // we don't change status of response since it was not in pending
255 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
259 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for subscription", __FILE__, __LINE__, id);
266 else if( procedureCode == ProcedureCode_id_ricSubscriptionDelete){
268 res = sub_del_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
270 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
272 id = he_response.get_request_id();
273 int req_status = get_request_status(id);
274 if (req_status == delete_request_pending ){
275 // Remove the subscription from the table
276 res = delete_subscription_entry(id);
278 set_request_status(id, delete_request_success);
279 valid_response = true;
282 set_request_status(id, delete_request_failed);
283 std::string error_string = "Error deleting subscription entry for id = ";
284 mdclog_write(MDCLOG_ERR, "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), id);
285 valid_response = true;
288 else if (req_status > 0){
289 // we don't change status since it was not in pending
291 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
294 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for deletion ", __FILE__, __LINE__, id);
301 mdclog_write(MDCLOG_ERR, "%s, %d: Subscription Handler Response received E2AP PDU success response with an non-subscription response related type %d", __FILE__, __LINE__, procedureCode);
306 else if(type == E2AP_PDU_PR_unsuccessfulOutcome){
308 procedureCode = e2ap_recv->choice.unsuccessfulOutcome->procedureCode;
309 mdclog_write(MDCLOG_INFO, "Received E2AP PDU unsuccessful outcome message with procedureCode = %d", procedureCode);
311 if(procedureCode == ProcedureCode_id_ricSubscription){
313 sub_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
315 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
316 id = he_response.get_request_id();
317 int req_status = get_request_status(id);
318 if(req_status == request_pending){
319 set_request_status(id, request_failed);
320 valid_response = true;
321 mdclog_write(MDCLOG_ERR, "Subscription request %d failed", id);
323 else if (req_status > 0){
324 // we don't changet status since it was not in pending
326 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
329 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for subscription ", __FILE__, __LINE__, id);
334 else if(procedureCode == ProcedureCode_id_ricSubscriptionDelete){
336 res = sub_del_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
338 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
340 id = he_response.get_request_id();
341 int req_status = get_request_status(id);
342 if(req_status == delete_request_pending){
343 set_request_status(id, delete_request_failed);
344 mdclog_write(MDCLOG_INFO, "Subscription delete request %d failed", id);
345 valid_response = true;
347 else if (req_status > 0){
348 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
351 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for deletion ", __FILE__, __LINE__, id);
357 mdclog_write(MDCLOG_ERR, "%s, %d: Susbcription Handler Response received E2AP PDU failure response with a non-subscription response related type %d", __FILE__, __LINE__, procedureCode);
362 mdclog_write(MDCLOG_ERR, "%s, %d: Susbcription Handler Response received E2AP PDU with non response type %d", __FILE__, __LINE__, type);
366 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv);
368 // wake up all waiting users ...
370 _cv.get()->notify_all();
376 int const SubscriptionHandler::get_request_status(int id){
377 auto search = requests_table.find(id);
378 if (search == requests_table.end()){
382 return search->second;
385 bool SubscriptionHandler::is_subscription_entry(int id){
386 auto search = subscription_responses.find(id);
387 if (search != subscription_responses.end())
393 bool SubscriptionHandler::is_request_entry(int id){
394 auto search = requests_table.find(id);
395 if (search != requests_table.end())