2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
19 /* Author : Ashwin Sridharan
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
29 #include <mdclog/mdclog.h>
31 #include <condition_variable>
32 #include <unordered_map>
35 #include <subscription_request.hpp>
36 #include <subscription_response.hpp>
37 #include <subscription_delete_request.hpp>
38 #include <subscription_delete_response.hpp>
46 delete_request_pending,
47 delete_request_success,
48 delete_request_failed,
50 }Subscription_Status_Types;
53 /* Class to process subscription related messages
54 each subscription request is assigned a unique internally
55 generated request id for tracking purposes. this is because
56 the e2 subscription request does not carry any gnodeb id information
60 class SubscriptionHandler {
63 SubscriptionHandler(void);
64 SubscriptionHandler(unsigned int, unsigned int);
67 template <typename Transmitter>
68 bool RequestSubscription(subscription_helper &, subscription_response_helper &, int , Transmitter &&);
70 template<typename Transmitter>
71 bool RequestSubscriptionDelete(subscription_helper &, subscription_response_helper &, int , Transmitter &&);
74 void Response(int, unsigned char *, int);
75 int const get_request_status(int);
76 subscription_response_helper * const get_subscription(int);
78 unsigned int get_next_id(void);
79 void set_timeout(unsigned int);
80 void set_timeout_flag(bool);
81 void set_num_retries(unsigned int);
83 bool is_subscription_entry(int);
84 bool is_request_entry(int);
87 size_t num_pending(void) const;
88 size_t num_complete(void) const ;
94 void ProcessSubscriptionResponse(unsigned char *, int len);
95 void ProcessSubscriptionFailure(unsigned char *, int len);
97 void ProcessDeleteResponse(unsigned char *, int len);
98 void ProcessSubscriptionDeleteFailure(unsigned char *, int len);
100 bool add_request_entry(int, int);
101 bool set_request_status(int, int);
102 bool delete_request_entry(int);
104 bool get_subscription_entry(int, int);
105 bool add_subscription_entry(int, subscription_response_helper &he);
106 bool delete_subscription_entry(int);
108 std::unordered_map<int, int> requests_table;
109 std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions
111 std::unique_ptr<std::mutex> _data_lock;
112 std::unique_ptr<std::condition_variable> _cv;
114 std::chrono::seconds _time_out;
115 unsigned int _num_retries = 2;
116 bool _time_out_flag = true;
117 unsigned int unique_request_id = 0;
121 template <typename Transmitter>
122 bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
125 unsigned char buffer[512];
126 size_t buf_len = 512;
128 // get a new unique request id ...
129 unsigned int new_req_id = get_next_id();
130 std::cout <<"Using id = " << new_req_id << std::endl;
131 he.set_request(new_req_id, he.get_req_seq());
133 E2AP_PDU_t *e2ap_pdu = 0;
134 subscription_request e2ap_sub_req;
136 // generate the request pdu
137 res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, e2ap_pdu, he);
139 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
143 // put entry in request table
145 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
146 res = add_request_entry(he.get_request_id(), request_pending);
148 mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id());
154 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
159 res = tx(TxCode, buf_len, buffer);
162 delete_request_entry(he.get_request_id());
163 mdclog_write(MDCLOG_ERR, "Error transmitting subscription request %d", he.get_request_id());
168 // record time stamp ..
169 auto start = std::chrono::system_clock::now();
175 // wait to be woken up
176 _cv.get()->wait_for(_local_lock, _time_out);
178 // we have woken and acquired data_lock
179 // check status and return appropriate object
181 int status = get_request_status(he.get_request_id());
183 if (status == request_success){
184 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());
188 if (status == request_pending){
189 // woken up spuriously or timed out
190 auto end = std::chrono::system_clock::now();
191 std::chrono::duration<double> f = end - start;
193 if (_time_out_flag && f > _num_retries * _time_out){
194 delete_request_entry(he.get_request_id());
195 mdclog_write(MDCLOG_ERR, "Subscription request %d timed out waiting for response ", he.get_request_id());
198 _local_lock.unlock();
202 mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());
207 // if we are hear, some spurious
208 // status obtained or request failed . we return false
209 delete_request_entry(he.get_request_id());
212 _local_lock.unlock();
218 // retreive the subscription response and clear queue
219 response = subscription_responses[he.get_request_id()];
220 delete_request_entry(he.get_request_id());
223 _local_lock.unlock();
229 template <typename Transmitter>
230 bool SubscriptionHandler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
234 // First check if we have this subscription
235 if(! is_subscription_entry(he.get_request_id())){
236 mdclog_write(MDCLOG_ERR, "subscription with id %d does not exist. Cannot be deleted", he.get_request_id());
240 // Also check if such a request is queued
241 if (is_request_entry(he.get_request_id())){
242 mdclog_write(MDCLOG_ERR, "Subscription delete request with id %d already in queue", he.get_request_id());
246 subscription_delete e2ap_sub_req_del;
248 // generate the delete request pdu
249 unsigned char buffer[128];
250 size_t buf_len = 128;
252 res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
254 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
258 // put entry in request table
259 res = add_request_entry(he.get_request_id(), delete_request_pending);
261 mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id());
265 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
268 res = tx(TxCode, buf_len, buffer);
271 delete_request_entry(he.get_request_id());
272 mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id());
277 // record time stamp ..
278 auto start = std::chrono::system_clock::now();
282 // wait to be woken up
283 _cv.get()->wait_for(_local_lock, _time_out);
285 // check status and return appropriate object
286 int status = get_request_status(he.get_request_id());
287 if (status == delete_request_success){
288 delete_request_entry(he.get_request_id());
289 mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id());
290 _local_lock.unlock();
294 if (status == delete_request_pending){
295 // woken up spuriously or timed out
296 auto end = std::chrono::system_clock::now();
297 std::chrono::duration<double> f = end - start;
299 if (_time_out_flag && f > _num_retries * _time_out){
300 delete_request_entry(he.get_request_id());
301 mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id());
302 _local_lock.unlock();
306 mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id());
312 // if we are hear, some spurious
313 // status obtained. we return false
315 delete_request_entry(he.get_request_id());
316 _local_lock.unlock();