2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
19 /* Author : Ashwin Sridharan
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
29 #include <mdclog/mdclog.h>
31 #include <condition_variable>
32 #include <unordered_map>
35 #include <subscription_request.hpp>
36 #include <subscription_response.hpp>
37 #include <subscription_delete_request.hpp>
38 #include <subscription_delete_response.hpp>
46 delete_request_pending,
47 delete_request_success,
48 delete_request_failed,
50 }Subscription_Status_Types;
53 /* Class to process subscription related messages
54 each subscription request is assigned a unique internally
55 generated request id for tracking purposes. this is because
56 the e2 subscription request does not carry any gnodeb id information
60 class SubscriptionHandler {
63 SubscriptionHandler(void);
64 SubscriptionHandler(unsigned int, unsigned int);
67 template <typename Transmitter>
68 bool RequestSubscription(subscription_helper &, subscription_response_helper &, int , Transmitter &&);
70 template<typename Transmitter>
71 bool RequestSubscriptionDelete(subscription_helper &, subscription_response_helper &, int , Transmitter &&);
74 void Response(int, unsigned char *, int);
75 int const get_request_status(int);
76 subscription_response_helper * const get_subscription(int);
78 unsigned int get_next_id(void);
79 void set_timeout(unsigned int);
80 void set_timeout_flag(bool);
81 void set_num_retries(unsigned int);
83 bool is_subscription_entry(int);
84 bool is_request_entry(int);
87 size_t num_pending(void) const;
88 size_t num_complete(void) const ;
94 void ProcessSubscriptionResponse(unsigned char *, int len);
95 void ProcessSubscriptionFailure(unsigned char *, int len);
97 void ProcessDeleteResponse(unsigned char *, int len);
98 void ProcessSubscriptionDeleteFailure(unsigned char *, int len);
100 bool add_request_entry(int, int);
101 bool set_request_status(int, int);
102 bool delete_request_entry(int);
104 bool get_subscription_entry(int, int);
105 bool add_subscription_entry(int, subscription_response_helper &he);
106 bool delete_subscription_entry(int);
108 std::unordered_map<int, int> requests_table;
109 std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions
111 std::unique_ptr<std::mutex> _data_lock;
112 std::unique_ptr<std::condition_variable> _cv;
114 std::chrono::seconds _time_out;
115 unsigned int _num_retries = 2;
116 bool _time_out_flag = true;
117 unsigned int unique_request_id = 0;
121 template <typename Transmitter>
122 bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
125 unsigned char buffer[512];
126 size_t buf_len = 512;
128 // get a new unique request id ...
129 unsigned int new_req_id = get_next_id();
130 mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);
131 he.set_request(new_req_id, he.get_req_seq());
133 E2AP_PDU_t *e2ap_pdu = 0;
134 subscription_request e2ap_sub_req;
136 // generate the request pdu
137 res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, e2ap_pdu, he);
139 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
143 // put entry in request table
145 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
146 res = add_request_entry(he.get_request_id(), request_pending);
148 mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id());
154 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
158 res = tx(TxCode, buf_len, buffer);
161 delete_request_entry(he.get_request_id());
162 mdclog_write(MDCLOG_ERR, "Error transmitting subscription request %d", he.get_request_id());
167 // record time stamp ..
168 auto start = std::chrono::system_clock::now();
173 // release lock and wait to be woken up
174 _cv.get()->wait_for(_local_lock, _time_out);
176 // we have woken and acquired data_lock
177 // check status and return appropriate object
179 int status = get_request_status(he.get_request_id());
181 if (status == request_success){
182 // retreive the subscription response and clear queue
183 response = subscription_responses[he.get_request_id()];
186 delete_request_entry(he.get_request_id());
189 _local_lock.unlock();
190 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());
195 if (status == request_pending){
197 // woken up spuriously or timed out
198 auto end = std::chrono::system_clock::now();
199 std::chrono::duration<double> f = end - start;
201 if (_time_out_flag && f > _num_retries * _time_out){
202 delete_request_entry(he.get_request_id());
203 mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());
206 _local_lock.unlock();
210 mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());
215 // if we are here, some spurious
216 // status obtained or request failed . we return false
217 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
218 delete_request_entry(he.get_request_id());
221 _local_lock.unlock();
232 template <typename Transmitter>
233 bool SubscriptionHandler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
237 // First check if we have this subscription
238 if(! is_subscription_entry(he.get_request_id())){
239 mdclog_write(MDCLOG_ERR, "subscription with id %d does not exist. Cannot be deleted", he.get_request_id());
243 // Also check if such a request is queued
244 if (is_request_entry(he.get_request_id())){
245 mdclog_write(MDCLOG_ERR, "Subscription delete request with id %d already in queue", he.get_request_id());
249 subscription_delete e2ap_sub_req_del;
251 // generate the delete request pdu
252 unsigned char buffer[128];
253 size_t buf_len = 128;
255 res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
257 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
261 // put entry in request table
262 res = add_request_entry(he.get_request_id(), delete_request_pending);
264 mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id());
268 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
271 res = tx(TxCode, buf_len, buffer);
274 delete_request_entry(he.get_request_id());
275 mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id());
280 // record time stamp ..
281 auto start = std::chrono::system_clock::now();
285 // wait to be woken up
286 _cv.get()->wait_for(_local_lock, _time_out);
288 // check status and return appropriate object
289 int status = get_request_status(he.get_request_id());
290 if (status == delete_request_success){
291 delete_request_entry(he.get_request_id());
292 mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id());
293 _local_lock.unlock();
297 if (status == delete_request_pending){
298 // woken up spuriously or timed out
299 auto end = std::chrono::system_clock::now();
300 std::chrono::duration<double> f = end - start;
302 if (_time_out_flag && f > _num_retries * _time_out){
303 delete_request_entry(he.get_request_id());
304 mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id());
305 _local_lock.unlock();
309 mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id());
315 // if we are hear, some spurious
316 // status obtained. we return false
318 delete_request_entry(he.get_request_id());
319 _local_lock.unlock();