--- /dev/null
+/*\r
+==================================================================================\r
+ Copyright (c) 2018-2019 AT&T Intellectual Property.\r
+\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+==================================================================================\r
+*/\r
+\r
+#pragma once\r
+\r
+#ifndef SUBSCRIPTION_HANDLER\r
+#define SUBSCRIPTION_HANDLER\r
+\r
+#include <mdclog/mdclog.h>\r
+#include <mutex>\r
+#include <condition_variable>\r
+#include <unordered_map>\r
+#include <chrono>\r
+\r
+#include <subscription_request.hpp>\r
+#include <subscription_response.hpp>\r
+\r
+using namespace std;\r
+\r
+typedef enum {\r
+ request_pending = 1,\r
+ request_success,\r
+ request_failed,\r
+ request_duplicate\r
+}Subscription_Status_Types;\r
+\r
+\r
+/* Class to process subscription related messages \r
+ each subscription request is assigned a unique internally\r
+generated request id for tracking purposes. this is because\r
+the e2 subscription request does not carry any gnodeb id information\r
+\r
+*/\r
+\r
+class SubscriptionHandler {\r
+\r
+public:\r
+ SubscriptionHandler(void);\r
+ SubscriptionHandler(unsigned int, unsigned int);\r
+ \r
+ void init(void);\r
+ template <typename Transmitter>\r
+ bool RequestSubscription(subscription_helper &, subscription_response_helper &, int , Transmitter &&);\r
+\r
+\r
+ void Response(int, unsigned char *, int);\r
+ int const get_request_status(int);\r
+ subscription_response_helper * const get_subscription(int);\r
+\r
+ unsigned int get_next_id(void);\r
+ void set_timeout(unsigned int);\r
+ void set_timeout_flag(bool);\r
+ void set_num_retries(unsigned int);\r
+ \r
+ bool is_subscription_entry(int); \r
+ bool is_request_entry(int);\r
+\r
+ void clear(void);\r
+ size_t num_pending(void) const;\r
+ size_t num_complete(void) const ;\r
+\r
+\r
+ \r
+private:\r
+\r
+ void ProcessSubscriptionResponse(unsigned char *, int len);\r
+ void ProcessSubscriptionFailure(unsigned char *, int len);\r
+\r
+ bool add_request_entry(int, int);\r
+ bool set_request_status(int, int);\r
+ bool delete_request_entry(int);\r
+ \r
+ bool get_subscription_entry(int, int);\r
+ bool add_subscription_entry(int, subscription_response_helper &he);\r
+ bool delete_subscription_entry(int);\r
+ \r
+ std::unordered_map<int, int> requests_table;\r
+ std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions\r
+ \r
+ std::unique_ptr<std::mutex> _data_lock;\r
+ std::unique_ptr<std::condition_variable> _cv;\r
+\r
+ std::chrono::seconds _time_out;\r
+ unsigned int _num_retries = 2;\r
+ bool _time_out_flag = true;\r
+ unsigned int unique_request_id = 0;\r
+ \r
+};\r
+\r
+template <typename Transmitter>\r
+bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){\r
+ \r
+ bool res;\r
+ unsigned char buffer[512];\r
+ size_t buf_len = 512;\r
+\r
+ // get a new unique request id ...\r
+ unsigned int new_req_id = get_next_id();\r
+ mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);\r
+ he.set_request(new_req_id, he.get_req_seq());\r
+ \r
+ E2AP_PDU_t *e2ap_pdu = 0;\r
+ subscription_request e2ap_sub_req;\r
+ \r
+ // generate the request pdu\r
+ res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, e2ap_pdu, he);\r
+ if(! res){\r
+ mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);\r
+ return false;\r
+ }\r
+ \r
+ // put entry in request table\r
+ {\r
+ std::lock_guard<std::mutex> lock(*(_data_lock.get()));\r
+ res = add_request_entry(he.get_request_id(), request_pending);\r
+ if(! res){\r
+ mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id());\r
+ return false;\r
+ }\r
+ }\r
+\r
+ // acquire lock ...\r
+ std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));\r
+\r
+\r
+ // Send the message\r
+ res = tx(TxCode, buf_len, buffer);\r
+ if (!res){\r
+ // clear state\r
+ delete_request_entry(he.get_request_id());\r
+ mdclog_write(MDCLOG_ERR, "Error transmitting subscription request %d", he.get_request_id());\r
+ return false;\r
+ };\r
+\r
+ \r
+ // record time stamp ..\r
+ auto start = std::chrono::system_clock::now();\r
+ \r
+ while(1){\r
+\r
+\r
+ // release lock and wait to be woken up\r
+ _cv.get()->wait_for(_local_lock, _time_out);\r
+ \r
+ // we have woken and acquired data_lock \r
+ // check status and return appropriate object\r
+ \r
+ int status = get_request_status(he.get_request_id());\r
+ \r
+ if (status == request_success){\r
+ // retreive the subscription response and clear queue\r
+ response = subscription_responses[he.get_request_id()];\r
+\r
+ // clear state\r
+ delete_request_entry(he.get_request_id());\r
+ \r
+ // release data lock\r
+ _local_lock.unlock();\r
+ mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());\r
+ \r
+ break;\r
+ }\r
+ \r
+ if (status == request_pending){\r
+ // woken up spuriously or timed out \r
+ auto end = std::chrono::system_clock::now();\r
+ std::chrono::duration<double> f = end - start;\r
+ \r
+ if (_time_out_flag && f > _num_retries * _time_out){\r
+ delete_request_entry(he.get_request_id());\r
+ mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());\r
+\r
+ // Release data lock\r
+ _local_lock.unlock();\r
+ return false;\r
+ }\r
+ else{\r
+ mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id()); \r
+ continue;\r
+ }\r
+ }\r
+\r
+ // if we are here, some spurious\r
+ // status obtained or request failed . we return false\r
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);\r
+ delete_request_entry(he.get_request_id());\r
+ \r
+ // release data lock\r
+ _local_lock.unlock();\r
+\r
+ return false;\r
+ \r
+ };\r
+ \r
+ return true;\r
+};\r
+\r
+#endif\r