2 ==================================================================================
\r
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
\r
5 Licensed under the Apache License, Version 2.0 (the "License");
\r
6 you may not use this file except in compliance with the License.
\r
7 You may obtain a copy of the License at
\r
9 http://www.apache.org/licenses/LICENSE-2.0
\r
11 Unless required by applicable law or agreed to in writing, software
\r
12 distributed under the License is distributed on an "AS IS" BASIS,
\r
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
14 See the License for the specific language governing permissions and
\r
15 limitations under the License.
\r
16 ==================================================================================
\r
19 #include <subscription_handler.hpp>
\r
22 SubscriptionHandler::SubscriptionHandler(void){
\r
25 _time_out = std::chrono::seconds(5);
\r
29 // unsigned char buffer[128];
\r
30 // size_t buf_len = 128;
\r
32 // E2AP_PDU_t e2ap_pdu;
\r
33 // subscription_request e2ap_sub_req;
\r
35 // int request_id = 2;
\r
37 // int function_id = 0;
\r
38 // int action_id = 0;
\r
39 // int action_type = 0;
\r
40 // int message_type = 1;
\r
42 // subscription_helper sgnb_add_subscr_req;
\r
44 // //sgnb_add_subscr_req.clear();
\r
45 // sgnb_add_subscr_req.set_request(request_id, req_seq);
\r
46 // sgnb_add_subscr_req.set_function_id(function_id);
\r
47 // sgnb_add_subscr_req.add_action(action_id, action_type);
\r
48 // std::string test = "This is a test";
\r
49 // sgnb_add_subscr_req.set_event_def(test.c_str(), test.length());
\r
50 // std::cout <<"Constructor ........" << std::endl;
\r
51 // // generate the request pdu
\r
52 // res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, &e2ap_pdu, sgnb_add_subscr_req);
\r
54 // mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
\r
57 // std::cout <<"Encoded subscription request pdu " << std::endl;
\r
62 SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds, unsigned int num_tries):_time_out(std::chrono::seconds(timeout_seconds)), _num_retries(num_tries){
\r
68 void SubscriptionHandler::init(void){
\r
70 _data_lock = std::make_unique<std::mutex>();
\r
71 _cv = std::make_unique<std::condition_variable>();
\r
75 void SubscriptionHandler::clear(void){
\r
77 std::lock_guard<std::mutex> lock(*(_data_lock).get());
\r
78 requests_table.clear();
\r
79 subscription_responses.clear();
\r
84 size_t SubscriptionHandler::num_pending(void) const {
\r
85 return requests_table.size();
\r
88 size_t SubscriptionHandler::num_complete(void) const {
\r
89 return subscription_responses.size();
\r
93 void SubscriptionHandler::set_timeout(unsigned int timeout_seconds){
\r
94 _time_out = std::chrono::seconds(timeout_seconds);
\r
97 void SubscriptionHandler::set_timeout_flag(bool val){
\r
98 _time_out_flag = val;
\r
101 void SubscriptionHandler::set_num_retries(unsigned int num_tries){
\r
102 _num_retries = num_tries;
\r
106 unsigned int SubscriptionHandler::get_next_id(void){
\r
107 std::lock_guard<std::mutex> lock(*(_data_lock).get());
\r
108 unique_request_id ++;
\r
109 return unique_request_id;
\r
112 bool SubscriptionHandler::add_request_entry(int id, int status){
\r
114 // add entry in hash table if it does not exist
\r
115 auto search = requests_table.find(id);
\r
116 if(search != requests_table.end()){
\r
120 requests_table[id] = status;
\r
125 bool SubscriptionHandler::set_request_status(int id, int status){
\r
127 // change status of a request only if it exists.
\r
129 auto search = requests_table.find(id);
\r
130 if(search != requests_table.end()){
\r
131 requests_table[id] = status;
\r
140 bool SubscriptionHandler::delete_request_entry(int id){
\r
142 auto search = requests_table.find(id);
\r
143 if (search != requests_table.end()){
\r
144 requests_table.erase(search);
\r
151 bool SubscriptionHandler::add_subscription_entry(int id, subscription_response_helper &he){
\r
153 auto search = subscription_responses.find(id);
\r
154 if (search == subscription_responses.end()){
\r
155 subscription_responses[id] = he;
\r
163 bool SubscriptionHandler::delete_subscription_entry(int id){
\r
165 auto search = subscription_responses.find(id);
\r
166 if(search == subscription_responses.end()){
\r
170 subscription_responses.erase(search);
\r
176 subscription_response_helper * const SubscriptionHandler::get_subscription(int id){
\r
177 auto search = subscription_responses.find(id);
\r
178 if(search == subscription_responses.end()){
\r
182 return &(subscription_responses[id]);
\r
187 // Handles responses from RMR
\r
188 void SubscriptionHandler::Response(int message_type, unsigned char *payload, int payload_length){
\r
194 bool valid_response =false;
\r
196 E2AP_PDU_t * e2ap_recv;
\r
197 asn_dec_rval_t retval;
\r
199 subscription_response sub_resp;
\r
201 subscription_response_helper he_response;
\r
204 size_t buf_size = 512;
\r
207 retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void**)&(e2ap_recv), payload, payload_length);
\r
209 if(retval.code != RC_OK){
\r
210 mdclog_write(MDCLOG_ERR, "%s, %d: Error decoding E2AP PDU of RMR type %d. Bytes decoded = %lu out of %d\n", __FILE__, __LINE__, message_type, retval.consumed, payload_length);
\r
211 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv);
\r
215 type = e2ap_recv->present;
\r
216 mdclog_write(MDCLOG_INFO, "Received RMR message of type = %d", type);
\r
218 if(type == E2AP_PDU_PR_successfulOutcome){
\r
220 procedureCode = e2ap_recv->choice.successfulOutcome->procedureCode;
\r
221 mdclog_write(MDCLOG_INFO, "Received E2AP PDU successful outcome message with procedureCode = %d", procedureCode);
\r
223 if( procedureCode == ProcedureCode_id_ricSubscription){
\r
224 // subscription response
\r
225 // decode the message
\r
226 sub_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response);
\r
229 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
\r
231 id = he_response.get_request_id();
\r
232 // get status of id
\r
233 int req_status = get_request_status(id);
\r
234 if (req_status == request_pending ){
\r
235 res = add_subscription_entry(id, he_response);
\r
237 set_request_status(id, request_success);
\r
240 set_request_status(id, request_duplicate);
\r
241 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d seems to be a duplicate\n", __FILE__, __LINE__, id);
\r
244 valid_response = true;
\r
246 else if (req_status > 0){
\r
247 // we don't change status of response since it was not in pending
\r
249 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
\r
253 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for subscription", __FILE__, __LINE__, id);
\r
261 mdclog_write(MDCLOG_ERR, "%s, %d: Subscription Handler Response received E2AP PDU success response with an non-subscription response related type %d", __FILE__, __LINE__, procedureCode);
\r
266 else if(type == E2AP_PDU_PR_unsuccessfulOutcome){
\r
268 procedureCode = e2ap_recv->choice.unsuccessfulOutcome->procedureCode;
\r
269 mdclog_write(MDCLOG_INFO, "Received E2AP PDU unsuccessful outcome message with procedureCode = %d", procedureCode);
\r
271 if(procedureCode == ProcedureCode_id_ricSubscription){
\r
273 sub_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response);
\r
275 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
\r
276 id = he_response.get_request_id();
\r
277 int req_status = get_request_status(id);
\r
278 if(req_status == request_pending){
\r
279 set_request_status(id, request_failed);
\r
280 valid_response = true;
\r
281 mdclog_write(MDCLOG_ERR, "Subscription request %d failed", id);
\r
283 else if (req_status > 0){
\r
284 // we don't changet status since it was not in pending
\r
286 mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status);
\r
289 mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for subscription ", __FILE__, __LINE__, id);
\r
295 mdclog_write(MDCLOG_ERR, "%s, %d: Susbcription Handler Response received E2AP PDU with non response type %d", __FILE__, __LINE__, type);
\r
299 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv);
\r
301 // wake up all waiting users ...
\r
302 if(valid_response){
\r
303 _cv.get()->notify_all();
\r
309 int const SubscriptionHandler::get_request_status(int id){
\r
310 auto search = requests_table.find(id);
\r
311 if (search == requests_table.end()){
\r
315 return search->second;
\r
318 bool SubscriptionHandler::is_subscription_entry(int id){
\r
319 auto search = subscription_responses.find(id);
\r
320 if (search != subscription_responses.end())
\r
326 bool SubscriptionHandler::is_request_entry(int id){
\r
327 auto search = requests_table.find(id);
\r
328 if (search != requests_table.end())
\r