2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
21 * Author: Ashwin Shridharan, Shraboni Jana
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
30 #include <mdclog/mdclog.h>
32 #include <condition_variable>
33 #include <unordered_map>
37 #include "subscription_delete_request.hpp"
38 #include "subscription_delete_response.hpp"
39 #include "subscription_request.hpp"
40 #include "subscription_response.hpp"
42 #define SUBSCR_SUCCESS 0
43 #define SUBSCR_ERR_TX 1
44 #define SUBSCR_ERR_TIMEOUT 2
45 #define SUBSCR_ERR_FAIL 3
46 #define SUBSCR_ERR_UNKNOWN 4
47 #define SUBSCR_ERR_DUPLICATE 5
48 #define SUBSCR_ERR_ENCODE 6
49 #define SUBSCR_ERR_MISSING 7
57 delete_request_pending,
58 delete_request_success,
59 delete_request_failed,
61 }Subscription_Status_Types;
63 using subscription_identifier = std::tuple<std::string , int>;
65 struct subscription_hasher {
66 size_t operator()(const subscription_identifier & key) const {
67 return std::hash<std::string>{}(std::get<0>(key) + std::to_string(std::get<1>(key)));
71 class SubscriptionHandler {
75 SubscriptionHandler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
79 template <typename Transmitter>
80 int request_subscription(std::string, int , Transmitter &&);
82 template<typename Transmitter>
83 int request_subscription_delete(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&);
85 void Response(int, unsigned char *, int, const char *);
86 int const get_request_status(subscription_identifier);
87 subscription_response_helper * const get_subscription(subscription_identifier);
89 unsigned int get_next_id(void);
90 void set_timeout(unsigned int);
91 void set_num_retries(unsigned int);
93 bool is_subscription_entry(subscription_identifier);
94 bool is_request_entry(subscription_identifier);
95 void get_subscription_keys(std::vector<subscription_identifier> &);
97 size_t num_pending(void) const;
98 size_t num_complete(void) const ;
105 bool add_request_entry(subscription_identifier, int);
106 bool set_request_status(subscription_identifier, int);
107 bool delete_request_entry(subscription_identifier);
109 bool get_subscription_entry(subscription_identifier);
110 bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
111 bool delete_subscription_entry(subscription_identifier);
113 std::unordered_map<subscription_identifier, int, subscription_hasher> requests_table;
114 std::unordered_map<subscription_identifier, subscription_response_helper, subscription_hasher> subscription_responses; // stores list of successful subscriptions
116 std::unique_ptr<std::mutex> _data_lock;
117 std::unique_ptr<std::condition_variable> _cv;
119 std::chrono::seconds _time_out;
120 unsigned int _num_retries = 2;
121 unsigned int unique_request_id = 0;
125 template <typename Transmitter>
126 int SubscriptionHandler::request_subscription(std::string node_id, int msgcode, Transmitter && tx){
128 // generate subscription identifier
129 subscription_identifier sub_id = std::make_tuple (node_id, 0); //0 is the function id which is hardcoded, which should come from rnib
133 // put entry in request table
135 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
136 res = add_request_entry(sub_id, request_pending);
138 mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
139 return SUBSCR_ERR_DUPLICATE;
144 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
152 delete_request_entry(sub_id);
153 mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
154 return SUBSCR_ERR_TX;
158 // record time stamp ..
159 auto start = std::chrono::system_clock::now();
160 res = SUBSCR_ERR_UNKNOWN;
163 // release lock and wait to be woken up
164 _cv.get()->wait_for(_local_lock, _time_out);
166 // we have woken and acquired data_lock
167 // check status and return appropriate object
169 int status = get_request_status(sub_id);
171 if (status == request_success){
172 // retreive & store the subscription response (why?)
173 // response = subscription_responses[sub_id];
174 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
175 res = SUBSCR_SUCCESS;
179 if (status == request_pending){
180 // woken up spuriously or timed out
181 auto end = std::chrono::system_clock::now();
182 std::chrono::duration<double> f = end - start;
184 if ( f > _num_retries * _time_out){
185 mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
186 res = SUBSCR_ERR_TIMEOUT;
190 mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
195 if(status == request_failed){
196 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
197 res = SUBSCR_ERR_FAIL;
201 if (status == request_duplicate){
202 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
203 res = SUBSCR_ERR_DUPLICATE;
208 // if we are here, some spurious
209 // status obtained or request failed . we return appropriate error code
210 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
211 res = SUBSCR_ERR_UNKNOWN;
215 delete_request_entry(sub_id);
218 _local_lock.unlock();
219 std::cout <<"Returning res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," << std::get<1>(sub_id) << std::endl;
224 /*template <typename Transmitter>
225 int SubscriptionHandler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){
228 // generate subscription identifier
229 subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
231 // First check if we have this subscription
232 if(! is_subscription_entry(sub_id)){
233 mdclog_write(MDCLOG_ERR, "subscription with id %s, %d does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
234 return SUBSCR_ERR_MISSING;
237 // Also check if such a request is queued
238 if (is_request_entry(sub_id)){
239 mdclog_write(MDCLOG_ERR, "Subscription delete request with id %s, %d already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
240 return SUBSCR_ERR_DUPLICATE;
243 subscription_delete e2ap_sub_req_del;
245 // generate the delete request pdu
246 unsigned char buffer[128];
247 size_t buf_len = 128;
249 res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
251 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
252 return SUBSCR_ERR_ENCODE;
255 // put entry in request table
257 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
258 res = add_request_entry(sub_id, delete_request_pending);
260 mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
261 return SUBSCR_ERR_DUPLICATE;
265 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
268 res = tx(TxCode, buf_len, buffer);
271 delete_request_entry(sub_id);
272 mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
273 return SUBSCR_ERR_TX;
277 // record time stamp ..
278 auto start = std::chrono::system_clock::now();
280 res = SUBSCR_ERR_UNKNOWN;
283 // wait to be woken up
284 _cv.get()->wait_for(_local_lock, _time_out);
286 // check status and return appropriate object
287 int status = get_request_status(sub_id);
288 if (status == delete_request_success){
289 mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
290 res = SUBSCR_SUCCESS;
294 if (status == delete_request_pending){
295 // woken up spuriously or timed out
296 auto end = std::chrono::system_clock::now();
297 std::chrono::duration<double> f = end - start;
299 if (f > _num_retries * _time_out){
300 mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
301 res = SUBSCR_ERR_TIMEOUT;
305 mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
311 if(status == delete_request_failed){
312 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
313 res = SUBSCR_ERR_FAIL;
317 // if we are here, some spurious
318 // status obtained. we return false
320 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
321 res = SUBSCR_ERR_UNKNOWN;
326 delete_request_entry(sub_id);
329 _local_lock.unlock();
330 std::cout <<"Returning res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl;