2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
21 * Author: Ashwin Shridharan, Shraboni Jana
23 #include "subs_mgmt.hpp"
28 SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds, unsigned int num_tries):_time_out(std::chrono::seconds(timeout_seconds)), _num_retries(num_tries){
32 void SubscriptionHandler::init(void){
34 _data_lock = std::make_unique<std::mutex>();
35 _cv = std::make_unique<std::condition_variable>();
39 void SubscriptionHandler::clear(void){
41 std::lock_guard<std::mutex> lock(*(_data_lock).get());
42 requests_table.clear();
47 size_t SubscriptionHandler::num_pending(void) const {
48 return requests_table.size();
51 void SubscriptionHandler::set_timeout(unsigned int timeout_seconds){
52 _time_out = std::chrono::seconds(timeout_seconds);
55 void SubscriptionHandler::set_num_retries(unsigned int num_tries){
56 _num_retries = num_tries;
61 bool SubscriptionHandler::add_request_entry(subscription_identifier id, int status){
63 // add entry in hash table if it does not exist
64 auto search = requests_table.find(id);
65 if(search != requests_table.end()){
69 requests_table[id] = status;
74 bool SubscriptionHandler::set_request_status(subscription_identifier id, int status){
76 // change status of a request only if it exists.
77 auto search = requests_table.find(id);
78 if(search != requests_table.end()){
79 requests_table[id] = status;
88 bool SubscriptionHandler::delete_request_entry(subscription_identifier id){
90 auto search = requests_table.find(id);
91 if (search != requests_table.end()){
92 requests_table.erase(search);
102 // Handles responses from RMR
103 /*void SubscriptionHandler::Response(int message_type, unsigned char *transaction_id, const char * node_id){
106 std::string node(node_id);
109 bool valid_response =false;
111 E2N_E2AP_PDU_t * e2ap_recv;
112 asn_dec_rval_t retval;
114 subscription_response sub_resp;
115 subscription_delete_response sub_del_resp;
117 subscription_response_helper he_response;
121 retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv), payload, payload_length);
123 if(retval.code != RC_OK){
124 mdclog_write(MDCLOG_ERR, "%s, %d: Error decoding E2AP PDU of RMR type %d. Bytes decoded = %lu out of %d\n", __FILE__, __LINE__, message_type, retval.consumed, payload_length);
125 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv);
129 type = e2ap_recv->present;
130 mdclog_write(MDCLOG_INFO, "Received RMR message of type = %d", type);
132 if(type == E2N_E2AP_PDU_PR_successfulOutcome){
134 procedureCode = e2ap_recv->choice.successfulOutcome->procedureCode;
135 mdclog_write(MDCLOG_INFO, "Received E2N_E2AP PDU successful outcome message with procedureCode = %d", procedureCode);
137 if( procedureCode == E2N_ProcedureCode_id_ricSubscription){
138 // subscription response
139 // decode the message
140 sub_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
142 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
144 subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
147 int req_status = get_request_status(id);
148 if (req_status == request_pending ){
149 res = add_subscription_entry(id, he_response);
151 set_request_status(id, request_success);
154 set_request_status(id, request_duplicate);
155 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d seems to be a duplicate. Subscription already present in subscription table\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
158 valid_response = true;
160 else if (req_status > 0){
161 // we don't change status of response since it was not in pending
163 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s,%d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status);
167 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %s, %d in request queue for subscription", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
174 else if( procedureCode == E2N_ProcedureCode_id_ricSubscriptionDelete){
176 res = sub_del_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
178 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
181 subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
183 int req_status = get_request_status(id);
184 if (req_status == delete_request_pending ){
185 // Remove the subscription from the table
186 res = delete_subscription_entry(id);
188 set_request_status(id, delete_request_success);
189 valid_response = true;
192 set_request_status(id, delete_request_failed);
193 mdclog_write(MDCLOG_ERR, "%s, %d: Error deleting subscription entry for %s, %d", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
194 valid_response = true;
197 else if (req_status > 0){
198 // we don't change status since it was not in pending
200 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, std::get<0>(id).c_str(), std::get<1>(id));
203 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find request id %s, %d in request queue for deletion ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
210 mdclog_write(MDCLOG_ERR, "%s, %d: Subscription Handler Response received E2AP PDU success response with an non-subscription response related type %d", __FILE__, __LINE__, procedureCode);
215 else if(type == E2N_E2AP_PDU_PR_unsuccessfulOutcome){
217 procedureCode = e2ap_recv->choice.unsuccessfulOutcome->procedureCode;
218 mdclog_write(MDCLOG_INFO, "Received E2AP PDU unsuccessful outcome message with procedureCode = %d", procedureCode);
220 if(procedureCode == E2N_ProcedureCode_id_ricSubscription){
222 sub_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
224 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
227 subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
229 int req_status = get_request_status(id);
230 if(req_status == request_pending){
231 set_request_status(id, request_failed);
232 valid_response = true;
233 mdclog_write(MDCLOG_ERR, "Subscription request %d failed", id);
235 else if (req_status > 0){
236 // we don't changet status since it was not in pending
238 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status);
241 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %s, %d in request queue for subscription ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
246 else if(procedureCode == E2N_ProcedureCode_id_ricSubscriptionDelete){
248 res = sub_del_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
250 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
252 subscription_identifier id = std::make_tuple (node, he_response.get_request_id());
254 int req_status = get_request_status(id);
255 if(req_status == delete_request_pending){
256 set_request_status(id, delete_request_failed);
257 mdclog_write(MDCLOG_INFO, "Subscription delete request %s,%d failed", std::get<0>(id).c_str(), std::get<1>(id));
258 valid_response = true;
260 else if (req_status > 0){
261 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s,%d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status);
264 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %s,%d in request queue for deletion ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id));
270 mdclog_write(MDCLOG_ERR, "%s, %d: Susbcription Handler Response received E2AP PDU failure response with a non-subscription response related type %d", __FILE__, __LINE__, procedureCode);
275 mdclog_write(MDCLOG_ERR, "%s, %d: Susbcription Handler Response received E2AP PDU with non response type %d", __FILE__, __LINE__, type);
279 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv);
281 // wake up all waiting users ...
283 _cv.get()->notify_all();
289 int const SubscriptionHandler::get_request_status(subscription_identifier id){
290 auto search = requests_table.find(id);
291 if (search == requests_table.end()){
295 return search->second;
300 bool SubscriptionHandler::is_request_entry(subscription_identifier id){
301 auto search = requests_table.find(id);
302 if (search != requests_table.end())