--- /dev/null
+/*\r
+==================================================================================\r
+ Copyright (c) 2018-2019 AT&T Intellectual Property.\r
+\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+==================================================================================\r
+*/\r
+\r
+#include <subscription_handler.hpp>\r
+#include <errno.h>\r
+\r
+SubscriptionHandler::SubscriptionHandler(void){\r
+\r
+ init();\r
+ _time_out = std::chrono::seconds(5);\r
+ _num_retries = 2;\r
+\r
+ // bool res;\r
+ // unsigned char buffer[128];\r
+ // size_t buf_len = 128;\r
+ \r
+ // E2AP_PDU_t e2ap_pdu;\r
+ // subscription_request e2ap_sub_req;\r
+\r
+ // int request_id = 2;\r
+ // int req_seq = 1;\r
+ // int function_id = 0;\r
+ // int action_id = 0;\r
+ // int action_type = 0;\r
+ // int message_type = 1;\r
+ \r
+ // subscription_helper sgnb_add_subscr_req;\r
+ \r
+ // //sgnb_add_subscr_req.clear();\r
+ // sgnb_add_subscr_req.set_request(request_id, req_seq);\r
+ // sgnb_add_subscr_req.set_function_id(function_id);\r
+ // sgnb_add_subscr_req.add_action(action_id, action_type);\r
+ // std::string test = "This is a test";\r
+ // sgnb_add_subscr_req.set_event_def(test.c_str(), test.length());\r
+ // std::cout <<"Constructor ........" << std::endl;\r
+ // // generate the request pdu\r
+ // res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, &e2ap_pdu, sgnb_add_subscr_req);\r
+ // if(! res){\r
+ // mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);\r
+ \r
+ // }\r
+ // std::cout <<"Encoded subscription request pdu " << std::endl;\r
+ \r
+ \r
+}\r
+\r
+SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds, unsigned int num_tries):_time_out(std::chrono::seconds(timeout_seconds)), _num_retries(num_tries){\r
+ init();\r
+\r
+ \r
+};\r
+\r
+void SubscriptionHandler::init(void){\r
+ \r
+ _data_lock = std::make_unique<std::mutex>();\r
+ _cv = std::make_unique<std::condition_variable>();\r
+ \r
+}\r
+\r
+void SubscriptionHandler::clear(void){\r
+ {\r
+ std::lock_guard<std::mutex> lock(*(_data_lock).get());\r
+ requests_table.clear();\r
+ subscription_responses.clear();\r
+ }\r
+ \r
+};\r
+\r
+size_t SubscriptionHandler::num_pending(void) const {\r
+ return requests_table.size();\r
+}\r
+\r
+size_t SubscriptionHandler::num_complete(void) const {\r
+ return subscription_responses.size();\r
+}\r
+\r
+\r
+void SubscriptionHandler::set_timeout(unsigned int timeout_seconds){\r
+ _time_out = std::chrono::seconds(timeout_seconds);\r
+}\r
+\r
+void SubscriptionHandler::set_timeout_flag(bool val){\r
+ _time_out_flag = val;\r
+}\r
+\r
+void SubscriptionHandler::set_num_retries(unsigned int num_tries){\r
+ _num_retries = num_tries;\r
+};\r
+\r
+\r
+unsigned int SubscriptionHandler::get_next_id(void){\r
+ std::lock_guard<std::mutex> lock(*(_data_lock).get());\r
+ unique_request_id ++;\r
+ return unique_request_id;\r
+}\r
+\r
+bool SubscriptionHandler::add_request_entry(int id, int status){\r
+\r
+ // add entry in hash table if it does not exist\r
+ auto search = requests_table.find(id);\r
+ if(search != requests_table.end()){\r
+ return false;\r
+ }\r
+ \r
+ requests_table[id] = status;\r
+ return true;\r
+\r
+};\r
+\r
+bool SubscriptionHandler::set_request_status(int id, int status){\r
+ \r
+ // change status of a request only if it exists.\r
+\r
+ auto search = requests_table.find(id);\r
+ if(search != requests_table.end()){\r
+ requests_table[id] = status;\r
+ return true;\r
+ }\r
+\r
+ return false;\r
+ \r
+};\r
+\r
+\r
+bool SubscriptionHandler::delete_request_entry(int id){\r
+\r
+ auto search = requests_table.find(id);\r
+ if (search != requests_table.end()){\r
+ requests_table.erase(search);\r
+ return true;\r
+ }\r
+\r
+ return false;\r
+};\r
+ \r
+bool SubscriptionHandler::add_subscription_entry(int id, subscription_response_helper &he){\r
+\r
+ auto search = subscription_responses.find(id);\r
+ if (search == subscription_responses.end()){\r
+ subscription_responses[id] = he;\r
+ return true;\r
+ }\r
+\r
+ return false;\r
+}\r
+\r
+\r
+bool SubscriptionHandler::delete_subscription_entry(int id){\r
+\r
+ auto search = subscription_responses.find(id);\r
+ if(search == subscription_responses.end()){\r
+ return false;\r
+ }\r
+ else{\r
+ subscription_responses.erase(search);\r
+ return true;\r
+ }\r
+ \r
+}\r
+\r
+subscription_response_helper * const SubscriptionHandler::get_subscription(int id){\r
+ auto search = subscription_responses.find(id);\r
+ if(search == subscription_responses.end()){\r
+ return NULL;\r
+ }\r
+ else{\r
+ return &(subscription_responses[id]);\r
+ }\r
+};\r
+\r
+\r
+// Handles responses from RMR\r
+void SubscriptionHandler::Response(int message_type, unsigned char *payload, int payload_length){\r
+\r
+ bool res;\r
+ int id;\r
+ int type;\r
+ int procedureCode;\r
+ bool valid_response =false;\r
+ \r
+ E2AP_PDU_t * e2ap_recv;\r
+ asn_dec_rval_t retval;\r
+\r
+ subscription_response sub_resp;\r
+\r
+ subscription_response_helper he_response;\r
+\r
+ char buf[512];\r
+ size_t buf_size = 512;\r
+\r
+ e2ap_recv = 0;\r
+ retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void**)&(e2ap_recv), payload, payload_length);\r
+\r
+ if(retval.code != RC_OK){\r
+ mdclog_write(MDCLOG_ERR, "%s, %d: Error decoding E2AP PDU of RMR type %d. Bytes decoded = %lu out of %d\n", __FILE__, __LINE__, message_type, retval.consumed, payload_length);\r
+ ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv);\r
+ return ;\r
+ }\r
+ \r
+ type = e2ap_recv->present;\r
+ mdclog_write(MDCLOG_INFO, "Received RMR message of type = %d", type);\r
+ \r
+ if(type == E2AP_PDU_PR_successfulOutcome){\r
+\r
+ procedureCode = e2ap_recv->choice.successfulOutcome->procedureCode;\r
+ mdclog_write(MDCLOG_INFO, "Received E2AP PDU successful outcome message with procedureCode = %d", procedureCode); \r
+\r
+ if( procedureCode == ProcedureCode_id_ricSubscription){ \r
+ // subscription response\r
+ // decode the message\r
+ sub_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);\r
+\r
+ {\r
+ std::lock_guard<std::mutex> lock(*(_data_lock.get()));\r
+ // get the id\r
+ id = he_response.get_request_id();\r
+ // get status of id \r
+ int req_status = get_request_status(id);\r
+ if (req_status == request_pending ){\r
+ res = add_subscription_entry(id, he_response);\r
+ if(res)\r
+ set_request_status(id, request_success);\r
+ \r
+ else{\r
+ set_request_status(id, request_duplicate);\r
+ mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d seems to be a duplicate\n", __FILE__, __LINE__, id);\r
+ }\r
+ \r
+ valid_response = true;\r
+ }\r
+ else if (req_status > 0){\r
+ // we don't change status of response since it was not in pending\r
+ // we simply fail\r
+ mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);\r
+ \r
+ }\r
+ else{\r
+ mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for subscription", __FILE__, __LINE__, id);\r
+ } \r
+ \r
+ }\r
+ \r
+ }\r
+\r
+ else{\r
+ mdclog_write(MDCLOG_ERR, "%s, %d: Subscription Handler Response received E2AP PDU success response with an non-subscription response related type %d", __FILE__, __LINE__, procedureCode);\r
+ }\r
+ \r
+ }\r
+ \r
+ else if(type == E2AP_PDU_PR_unsuccessfulOutcome){\r
+ \r
+ procedureCode = e2ap_recv->choice.unsuccessfulOutcome->procedureCode;\r
+ mdclog_write(MDCLOG_INFO, "Received E2AP PDU unsuccessful outcome message with procedureCode = %d", procedureCode); \r
+ \r
+ if(procedureCode == ProcedureCode_id_ricSubscription){\r
+ \r
+ sub_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);\r
+ {\r
+ std::lock_guard<std::mutex> lock(*(_data_lock.get())); \r
+ id = he_response.get_request_id();\r
+ int req_status = get_request_status(id);\r
+ if(req_status == request_pending){\r
+ set_request_status(id, request_failed);\r
+ valid_response = true;\r
+ mdclog_write(MDCLOG_ERR, "Subscription request %d failed", id);\r
+ }\r
+ else if (req_status > 0){\r
+ // we don't changet status since it was not in pending\r
+ // we simply fail\r
+ mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);\r
+ }\r
+ else{\r
+ mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for subscription ", __FILE__, __LINE__, id);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ else{\r
+ mdclog_write(MDCLOG_ERR, "%s, %d: Susbcription Handler Response received E2AP PDU with non response type %d", __FILE__, __LINE__, type);\r
+ }\r
+ \r
+ \r
+ ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv);\r
+ \r
+ // wake up all waiting users ...\r
+ if(valid_response){\r
+ _cv.get()->notify_all();\r
+ }\r
+ \r
+}\r
+\r
+\r
+int const SubscriptionHandler::get_request_status(int id){\r
+ auto search = requests_table.find(id);\r
+ if (search == requests_table.end()){\r
+ return -1;\r
+ }\r
+ \r
+ return search->second;\r
+}\r
+ \r
+ bool SubscriptionHandler::is_subscription_entry(int id){\r
+ auto search = subscription_responses.find(id);\r
+ if (search != subscription_responses.end())\r
+ return true;\r
+ else\r
+ return false;\r
+}\r
+\r
+bool SubscriptionHandler::is_request_entry(int id){\r
+ auto search = requests_table.find(id);\r
+ if (search != requests_table.end())\r
+ return true;\r
+ else\r
+ return false;\r
+}\r