2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
21 * Author: Ashwin Shridharan, Shraboni Jana
23 #include "subs_mgmt.hpp"
28 SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds, unsigned int num_tries):_time_out(std::chrono::seconds(timeout_seconds)), _num_retries(num_tries){
32 void SubscriptionHandler::init(void){
34 _data_lock = std::make_unique<std::mutex>();
35 _cv = std::make_unique<std::condition_variable>();
39 void SubscriptionHandler::clear(void){
41 std::lock_guard<std::mutex> lock(*(_data_lock).get());
42 requests_table.clear();
43 subscription_responses.clear();
48 size_t SubscriptionHandler::num_pending(void) const {
49 return requests_table.size();
52 size_t SubscriptionHandler::num_complete(void) const {
53 return subscription_responses.size();
57 void SubscriptionHandler::set_timeout(unsigned int timeout_seconds){
58 _time_out = std::chrono::seconds(timeout_seconds);
61 void SubscriptionHandler::set_num_retries(unsigned int num_tries){
62 _num_retries = num_tries;
67 bool SubscriptionHandler::add_request_entry(subscription_identifier id, int status){
69 // add entry in hash table if it does not exist
70 auto search = requests_table.find(id);
71 if(search != requests_table.end()){
75 requests_table[id] = status;
80 bool SubscriptionHandler::set_request_status(subscription_identifier id, int status){
82 // change status of a request only if it exists.
83 auto search = requests_table.find(id);
84 if(search != requests_table.end()){
85 requests_table[id] = status;
94 bool SubscriptionHandler::delete_request_entry(subscription_identifier id){
96 auto search = requests_table.find(id);
97 if (search != requests_table.end()){
98 requests_table.erase(search);
105 bool SubscriptionHandler::add_subscription_entry(subscription_identifier id, subscription_response_helper &he){
107 auto search = subscription_responses.find(id);
108 if (search == subscription_responses.end()){
109 subscription_responses[id] = he;
117 bool SubscriptionHandler::delete_subscription_entry(subscription_identifier id){
119 auto search = subscription_responses.find(id);
120 if(search == subscription_responses.end()){
124 subscription_responses.erase(search);
130 subscription_response_helper * const SubscriptionHandler::get_subscription(subscription_identifier id){
131 auto search = subscription_responses.find(id);
132 if(search == subscription_responses.end()){
136 return &(subscription_responses[id]);
141 // Handles responses from RMR
142 void SubscriptionHandler::Response(int message_type, unsigned char *payload, int payload_length, const char * node_id){
145 std::string node(node_id);
148 bool valid_response =false;
150 E2N_E2AP_PDU_t * e2ap_recv;
151 asn_dec_rval_t retval;
153 subscription_response sub_resp;
154 subscription_delete_response sub_del_resp;
156 subscription_response_helper he_response;
160 retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv), payload, payload_length);
162 if(retval.code != RC_OK){
163 mdclog_write(MDCLOG_ERR, "%s, %d: Error decoding E2AP PDU of RMR type %d. Bytes decoded = %lu out of %d\n", __FILE__, __LINE__, message_type, retval.consumed, payload_length);
164 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv);
168 type = e2ap_recv->present;
169 mdclog_write(MDCLOG_INFO, "Received RMR message of type = %d", type);
171 if(type == E2N_E2AP_PDU_PR_successfulOutcome){
173 procedureCode = e2ap_recv->choice.successfulOutcome->procedureCode;
174 mdclog_write(MDCLOG_INFO, "Received E2N_E2AP PDU successful outcome message with procedureCode = %d", procedureCode);
176 if( procedureCode == E2N_ProcedureCode_id_ricSubscription){
177 // subscription response
178 // decode the message
179 sub_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
181 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
183 subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
186 int req_status = get_request_status(id);
187 if (req_status == request_pending ){
188 res = add_subscription_entry(id, he_response);
190 set_request_status(id, request_success);
193 set_request_status(id, request_duplicate);
194 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d seems to be a duplicate. Subscription already present in subscription table\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
197 valid_response = true;
199 else if (req_status > 0){
200 // we don't change status of response since it was not in pending
202 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s,%d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status);
206 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %s, %d in request queue for subscription", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
213 else if( procedureCode == E2N_ProcedureCode_id_ricSubscriptionDelete){
215 res = sub_del_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
217 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
220 subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
222 int req_status = get_request_status(id);
223 if (req_status == delete_request_pending ){
224 // Remove the subscription from the table
225 res = delete_subscription_entry(id);
227 set_request_status(id, delete_request_success);
228 valid_response = true;
231 set_request_status(id, delete_request_failed);
232 mdclog_write(MDCLOG_ERR, "%s, %d: Error deleting subscription entry for %s, %d", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
233 valid_response = true;
236 else if (req_status > 0){
237 // we don't change status since it was not in pending
239 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, std::get<0>(id).c_str(), std::get<1>(id));
242 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find request id %s, %d in request queue for deletion ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
249 mdclog_write(MDCLOG_ERR, "%s, %d: Subscription Handler Response received E2AP PDU success response with an non-subscription response related type %d", __FILE__, __LINE__, procedureCode);
254 else if(type == E2N_E2AP_PDU_PR_unsuccessfulOutcome){
256 procedureCode = e2ap_recv->choice.unsuccessfulOutcome->procedureCode;
257 mdclog_write(MDCLOG_INFO, "Received E2AP PDU unsuccessful outcome message with procedureCode = %d", procedureCode);
259 if(procedureCode == E2N_ProcedureCode_id_ricSubscription){
261 sub_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
263 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
266 subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
268 int req_status = get_request_status(id);
269 if(req_status == request_pending){
270 set_request_status(id, request_failed);
271 valid_response = true;
272 mdclog_write(MDCLOG_ERR, "Subscription request %d failed", id);
274 else if (req_status > 0){
275 // we don't changet status since it was not in pending
277 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status);
280 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %s, %d in request queue for subscription ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
285 else if(procedureCode == E2N_ProcedureCode_id_ricSubscriptionDelete){
287 res = sub_del_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
289 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
291 subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
293 int req_status = get_request_status(id);
294 if(req_status == delete_request_pending){
295 set_request_status(id, delete_request_failed);
296 mdclog_write(MDCLOG_INFO, "Subscription delete request %s,%d failed", std::get<0>(id).c_str(), std::get<1>(id));
297 valid_response = true;
299 else if (req_status > 0){
300 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s,%d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status);
303 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %s,%d in request queue for deletion ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
309 mdclog_write(MDCLOG_ERR, "%s, %d: Susbcription Handler Response received E2AP PDU failure response with a non-subscription response related type %d", __FILE__, __LINE__, procedureCode);
314 mdclog_write(MDCLOG_ERR, "%s, %d: Susbcription Handler Response received E2AP PDU with non response type %d", __FILE__, __LINE__, type);
318 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv);
320 // wake up all waiting users ...
322 _cv.get()->notify_all();
328 int const SubscriptionHandler::get_request_status(subscription_identifier id){
329 auto search = requests_table.find(id);
330 if (search == requests_table.end()){
334 return search->second;
337 bool SubscriptionHandler::is_subscription_entry(subscription_identifier id){
338 auto search = subscription_responses.find(id);
339 if (search != subscription_responses.end())
345 bool SubscriptionHandler::is_request_entry(subscription_identifier id){
346 auto search = requests_table.find(id);
347 if (search != requests_table.end())
354 void SubscriptionHandler::get_subscription_keys(std::vector<subscription_identifier> & key_list){
355 for(auto & e: subscription_responses){
356 key_list.push_back(e.first);