2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
19 /* Author : Ashwin Sridharan
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
29 #include <mdclog/mdclog.h>
31 #include <condition_variable>
32 #include <unordered_map>
35 #include <subscription_request.hpp>
36 #include <subscription_response.hpp>
37 #include <subscription_delete_request.hpp>
38 #include <subscription_delete_response.hpp>
40 #define SUBSCR_SUCCESS 0
41 #define SUBSCR_ERR_TX 1
42 #define SUBSCR_ERR_TIMEOUT 2
43 #define SUBSCR_ERR_FAIL 3
44 #define SUBSCR_ERR_UNKNOWN 4
45 #define SUBSCR_ERR_DUPLICATE 5
46 #define SUBSCR_ERR_ENCODE 6
47 #define SUBSCR_ERR_MISSING 7
55 delete_request_pending,
56 delete_request_success,
57 delete_request_failed,
59 }Subscription_Status_Types;
62 /* Class to process subscription related messages
63 each subscription request is assigned a unique internally
64 generated request id for tracking purposes. this is because
65 the e2 subscription request does not carry any gnodeb id information
69 class subscription_handler {
72 subscription_handler(void);
73 subscription_handler(unsigned int, unsigned int);
76 template <typename Transmitter>
77 int RequestSubscription(subscription_helper &, subscription_response_helper &, int , Transmitter &&);
79 template<typename Transmitter>
80 int RequestSubscriptionDelete(subscription_helper &, subscription_response_helper &, int , Transmitter &&);
83 void Response(int, unsigned char *, int);
84 int const get_request_status(int);
85 subscription_response_helper * const get_subscription(int);
87 unsigned int get_next_id(void);
88 void set_timeout(unsigned int);
89 void set_num_retries(unsigned int);
91 bool is_subscription_entry(int);
92 bool is_request_entry(int);
95 size_t num_pending(void) const;
96 size_t num_complete(void) const ;
102 bool add_request_entry(int, int);
103 bool set_request_status(int, int);
104 bool delete_request_entry(int);
106 bool get_subscription_entry(int, int);
107 bool add_subscription_entry(int, subscription_response_helper &he);
108 bool delete_subscription_entry(int);
110 std::unordered_map<int, int> requests_table;
111 std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions
113 std::unique_ptr<std::mutex> _data_lock;
114 std::unique_ptr<std::condition_variable> _cv;
116 std::chrono::seconds _time_out;
117 unsigned int _num_retries = 2;
118 unsigned int unique_request_id = 0;
122 template <typename Transmitter>
123 int subscription_handler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
126 unsigned char buffer[512];
127 size_t buf_len = 512;
129 // get a new unique request id ...
130 unsigned int new_req_id = get_next_id();
131 mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);
132 he.set_request(new_req_id, he.get_req_seq());
135 subscription_request e2ap_sub_req;
137 // generate the request pdu
138 res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he);
140 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
141 return SUBSCR_ERR_ENCODE;
144 // put entry in request table
146 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
147 res = add_request_entry(he.get_request_id(), request_pending);
149 mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %d to queue", __FILE__, __LINE__, he.get_request_id());
150 return SUBSCR_ERR_UNKNOWN;
155 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
159 res = tx(TxCode, buf_len, buffer);
162 delete_request_entry(he.get_request_id());
163 mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %d", __FILE__, __LINE__, he.get_request_id());
164 return SUBSCR_ERR_TX;
168 // record time stamp ..
169 auto start = std::chrono::system_clock::now();
170 res = SUBSCR_ERR_UNKNOWN;
175 // release lock and wait to be woken up
176 _cv.get()->wait_for(_local_lock, _time_out);
178 // we have woken and acquired data_lock
179 // check status and return appropriate object
181 int status = get_request_status(he.get_request_id());
183 if (status == request_success){
185 // retreive & store the subscription response
186 response = subscription_responses[he.get_request_id()];
187 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());
188 res = SUBSCR_SUCCESS;
192 if (status == request_pending){
194 // woken up spuriously or timed out
195 auto end = std::chrono::system_clock::now();
196 std::chrono::duration<double> f = end - start;
198 if ( f > _num_retries * _time_out){
199 mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());
200 res = SUBSCR_ERR_TIMEOUT;
201 std::cout <<"Set res = " << res << " for " << he.get_request_id() << std::endl;
205 mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());
210 if(status == request_failed){
211 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %d got failure response .. \n", __FILE__, __LINE__, he.get_request_id());
212 res = SUBSCR_ERR_FAIL;
216 // if we are here, some spurious
217 // status obtained or request failed . we return false
218 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
219 res = SUBSCR_ERR_UNKNOWN;
223 delete_request_entry(he.get_request_id());
226 _local_lock.unlock();
227 std::cout <<"Returning res = " << res << " for " << he.get_request_id() << std::endl;
232 template <typename Transmitter>
233 int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
237 // First check if we have this subscription
238 if(! is_subscription_entry(he.get_request_id())){
239 mdclog_write(MDCLOG_ERR, "subscription with id %d does not exist. Cannot be deleted", he.get_request_id());
240 return SUBSCR_ERR_MISSING;
243 // Also check if such a request is queued
244 if (is_request_entry(he.get_request_id())){
245 mdclog_write(MDCLOG_ERR, "Subscription delete request with id %d already in queue", he.get_request_id());
246 return SUBSCR_ERR_UNKNOWN;
249 subscription_delete e2ap_sub_req_del;
251 // generate the delete request pdu
252 unsigned char buffer[128];
253 size_t buf_len = 128;
255 res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
257 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
258 return SUBSCR_ERR_ENCODE;
261 // put entry in request table
262 res = add_request_entry(he.get_request_id(), delete_request_pending);
264 mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id());
265 return SUBSCR_ERR_UNKNOWN;
268 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
271 res = tx(TxCode, buf_len, buffer);
274 delete_request_entry(he.get_request_id());
275 mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id());
276 return SUBSCR_ERR_TX;
280 // record time stamp ..
281 auto start = std::chrono::system_clock::now();
283 res = SUBSCR_ERR_UNKNOWN;
286 // wait to be woken up
287 _cv.get()->wait_for(_local_lock, _time_out);
289 // check status and return appropriate object
290 int status = get_request_status(he.get_request_id());
291 if (status == delete_request_success){
292 mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id());
293 res = SUBSCR_SUCCESS;
297 if (status == delete_request_pending){
298 // woken up spuriously or timed out
299 auto end = std::chrono::system_clock::now();
300 std::chrono::duration<double> f = end - start;
302 if (f > _num_retries * _time_out){
303 mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id());
304 res = SUBSCR_ERR_TIMEOUT;
308 mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id());
314 if(status == delete_request_failed){
315 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %d got failure response .. \n", __FILE__, __LINE__, he.get_request_id());
316 res = SUBSCR_ERR_FAIL;
319 // if we are here, some spurious
320 // status obtained. we return false
322 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
323 res = SUBSCR_ERR_UNKNOWN;
328 delete_request_entry(he.get_request_id());
331 _local_lock.unlock();
332 std::cout <<"Returning res = " << res << " for " << he.get_request_id() << std::endl;