2 ==================================================================================
\r
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
\r
5 Licensed under the Apache License, Version 2.0 (the "License");
\r
6 you may not use this file except in compliance with the License.
\r
7 You may obtain a copy of the License at
\r
9 http://www.apache.org/licenses/LICENSE-2.0
\r
11 Unless required by applicable law or agreed to in writing, software
\r
12 distributed under the License is distributed on an "AS IS" BASIS,
\r
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
14 See the License for the specific language governing permissions and
\r
15 limitations under the License.
\r
16 ==================================================================================
\r
21 #ifndef SUBSCRIPTION_HANDLER
\r
22 #define SUBSCRIPTION_HANDLER
\r
24 #include <mdclog/mdclog.h>
\r
26 #include <condition_variable>
\r
27 #include <unordered_map>
\r
30 #include <subscription_request.hpp>
\r
31 #include <subscription_response.hpp>
\r
33 using namespace std;
\r
36 request_pending = 1,
\r
40 }Subscription_Status_Types;
\r
43 /* Class to process subscription related messages
\r
44 each subscription request is assigned a unique internally
\r
45 generated request id for tracking purposes. this is because
\r
46 the e2 subscription request does not carry any gnodeb id information
\r
50 class SubscriptionHandler {
\r
53 SubscriptionHandler(void);
\r
54 SubscriptionHandler(unsigned int, unsigned int);
\r
57 template <typename Transmitter>
\r
58 bool RequestSubscription(subscription_helper &, subscription_response_helper &, int , Transmitter &&);
\r
61 void Response(int, unsigned char *, int);
\r
62 int const get_request_status(int);
\r
63 subscription_response_helper * const get_subscription(int);
\r
65 unsigned int get_next_id(void);
\r
66 void set_timeout(unsigned int);
\r
67 void set_timeout_flag(bool);
\r
68 void set_num_retries(unsigned int);
\r
70 bool is_subscription_entry(int);
\r
71 bool is_request_entry(int);
\r
74 size_t num_pending(void) const;
\r
75 size_t num_complete(void) const ;
\r
81 void ProcessSubscriptionResponse(unsigned char *, int len);
\r
82 void ProcessSubscriptionFailure(unsigned char *, int len);
\r
84 bool add_request_entry(int, int);
\r
85 bool set_request_status(int, int);
\r
86 bool delete_request_entry(int);
\r
88 bool get_subscription_entry(int, int);
\r
89 bool add_subscription_entry(int, subscription_response_helper &he);
\r
90 bool delete_subscription_entry(int);
\r
92 std::unordered_map<int, int> requests_table;
\r
93 std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions
\r
95 std::unique_ptr<std::mutex> _data_lock;
\r
96 std::unique_ptr<std::condition_variable> _cv;
\r
98 std::chrono::seconds _time_out;
\r
99 unsigned int _num_retries = 2;
\r
100 bool _time_out_flag = true;
\r
101 unsigned int unique_request_id = 0;
\r
105 template <typename Transmitter>
\r
106 bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
\r
109 unsigned char buffer[512];
\r
110 size_t buf_len = 512;
\r
112 // get a new unique request id ...
\r
113 unsigned int new_req_id = get_next_id();
\r
114 mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);
\r
115 he.set_request(new_req_id, he.get_req_seq());
\r
117 E2AP_PDU_t *e2ap_pdu = 0;
\r
118 subscription_request e2ap_sub_req;
\r
120 // generate the request pdu
\r
121 res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, e2ap_pdu, he);
\r
123 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
\r
127 // put entry in request table
\r
129 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
\r
130 res = add_request_entry(he.get_request_id(), request_pending);
\r
132 mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id());
\r
137 // acquire lock ...
\r
138 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
\r
141 // Send the message
\r
142 res = tx(TxCode, buf_len, buffer);
\r
145 delete_request_entry(he.get_request_id());
\r
146 mdclog_write(MDCLOG_ERR, "Error transmitting subscription request %d", he.get_request_id());
\r
151 // record time stamp ..
\r
152 auto start = std::chrono::system_clock::now();
\r
157 // release lock and wait to be woken up
\r
158 _cv.get()->wait_for(_local_lock, _time_out);
\r
160 // we have woken and acquired data_lock
\r
161 // check status and return appropriate object
\r
163 int status = get_request_status(he.get_request_id());
\r
165 if (status == request_success){
\r
166 // retreive the subscription response and clear queue
\r
167 response = subscription_responses[he.get_request_id()];
\r
170 delete_request_entry(he.get_request_id());
\r
172 // release data lock
\r
173 _local_lock.unlock();
\r
174 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());
\r
179 if (status == request_pending){
\r
180 // woken up spuriously or timed out
\r
181 auto end = std::chrono::system_clock::now();
\r
182 std::chrono::duration<double> f = end - start;
\r
184 if (_time_out_flag && f > _num_retries * _time_out){
\r
185 delete_request_entry(he.get_request_id());
\r
186 mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());
\r
188 // Release data lock
\r
189 _local_lock.unlock();
\r
193 mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());
\r
198 // if we are here, some spurious
\r
199 // status obtained or request failed . we return false
\r
200 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
\r
201 delete_request_entry(he.get_request_id());
\r
203 // release data lock
\r
204 _local_lock.unlock();
\r