/* ================================================================================== Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ================================================================================== */ /* Author : Ashwin Sridharan Date : Feb 2019 */ #pragma once #ifndef SUBSCRIPTION_HANDLER #define SUBSCRIPTION_HANDLER #include #include #include #include #include #include #include #include #include #include #include #define SUBSCR_SUCCESS 0 #define SUBSCR_ERR_TX 1 #define SUBSCR_ERR_TIMEOUT 2 #define SUBSCR_ERR_FAIL 3 #define SUBSCR_ERR_UNKNOWN 4 #define SUBSCR_ERR_DUPLICATE 5 #define SUBSCR_ERR_ENCODE 6 #define SUBSCR_ERR_MISSING 7 using namespace std; typedef enum { request_pending = 1, request_success, request_failed, delete_request_pending, delete_request_success, delete_request_failed, request_duplicate }Subscription_Status_Types; using subscription_identifier = std::tuple; /* Class to process subscription related messages The class provides mechanism to send and process subscriptions and subscription deletes. NOTE 1: It is currently unclear how an xAPP should identify a subscription request/response pair uniquely in the absence/presence of subscription manager. Ideally, the subscription manager should be transperent to the xAPP but that may not be the case, i.e the subscription manager may the subscription request id fields. The xAPP needs to identify uniquely not just the subscription response, but also when it needs to send a delete for the corresponding request. From that perspective, the fields present in both the subscription response and a delete request are the RICrequestId and RANfunctionId. However, the subscription manager may require that the RICrequestId fields be set to a specific value (TBD). Hence for current purposes, a RIC subscription request is uniquely identified by the tuple . This is not ideal, since potentially the same RANfunctionID may be subscribed to in different modes, but for now this is the constraint. NOTE 2: There is discussion on tracking subscription request/repsonse using the RMR transaction ID. However, a conscious choice made with the subscription_handler is that it be agnostic to the transmission medium(RMR) for purposes of design isolation. Consequently, the subscription handler is not aware of any RMR related semantics, but simply accepts a function to send the request/delete request that accepts a signature Type, Length, Value . This also means in its current design, we cannot use transaction id to track request/response. NOTE 3: The subscription handler is thread-safe, i.e multiple elements can request subscriptions/subscription deletes from multiple threads. However this does not preclude conflict if multiple threads are trying to make subscriptions based on the same triplet (in which cases, results will be internally consistent, but may yield errors to calling agent). */ struct subscription_hasher { size_t operator()(const subscription_identifier & key) const { return std::hash{}(std::get<0>(key) + std::to_string(std::get<1>(key))); } }; class subscription_handler { public: subscription_handler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2); void init(void); template int request_subscription(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&); template int request_subscription_delete(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&); void Response(int, unsigned char *, int, const char *); int const get_request_status(subscription_identifier); subscription_response_helper * const get_subscription(subscription_identifier); unsigned int get_next_id(void); void set_timeout(unsigned int); void set_num_retries(unsigned int); bool is_subscription_entry(subscription_identifier); bool is_request_entry(subscription_identifier); void get_subscription_keys(std::vector &); void clear(void); size_t num_pending(void) const; size_t num_complete(void) const ; private: bool add_request_entry(subscription_identifier, int); bool set_request_status(subscription_identifier, int); bool delete_request_entry(subscription_identifier); bool get_subscription_entry(subscription_identifier); bool add_subscription_entry(subscription_identifier, subscription_response_helper &he); bool delete_subscription_entry(subscription_identifier); std::unordered_map requests_table; std::unordered_map subscription_responses; // stores list of successful subscriptions std::unique_ptr _data_lock; std::unique_ptr _cv; std::chrono::seconds _time_out; unsigned int _num_retries = 2; unsigned int unique_request_id = 0; }; template int subscription_handler::request_subscription(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){ int res; unsigned char buffer[512]; size_t buf_len = 512; // As per current design, request id and request sequence number // must be set to zero ... he.set_request(0, 0); subscription_request e2ap_sub_req; // generate subscription identifier subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id()); // generate the request pdu res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he); if(! res){ mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__); return SUBSCR_ERR_ENCODE; } // put entry in request table { std::lock_guard lock(*(_data_lock.get())); res = add_request_entry(sub_id, request_pending); if(! res){ mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); return SUBSCR_ERR_DUPLICATE; } } // acquire lock ... std::unique_lock _local_lock(*(_data_lock.get())); // Send the message res = tx(TxCode, buf_len, buffer); if (!res){ // clear state delete_request_entry(sub_id); mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); return SUBSCR_ERR_TX; }; // record time stamp .. auto start = std::chrono::system_clock::now(); res = SUBSCR_ERR_UNKNOWN; while(1){ // release lock and wait to be woken up _cv.get()->wait_for(_local_lock, _time_out); // we have woken and acquired data_lock // check status and return appropriate object int status = get_request_status(sub_id); if (status == request_success){ // retreive & store the subscription response response = subscription_responses[sub_id]; mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_SUCCESS; break; } if (status == request_pending){ // woken up spuriously or timed out auto end = std::chrono::system_clock::now(); std::chrono::duration f = end - start; if ( f > _num_retries * _time_out){ mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_TIMEOUT; break; } else{ mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); continue; } } if(status == request_failed){ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_FAIL; break; } if (status == request_duplicate){ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_DUPLICATE; break; } // if we are here, some spurious // status obtained or request failed . we return appropriate error code mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); res = SUBSCR_ERR_UNKNOWN; break; }; delete_request_entry(sub_id); // release data lock _local_lock.unlock(); std::cout <<"Returning res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," << std::get<1>(sub_id) << std::endl; return res; }; template int subscription_handler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){ int res; // generate subscription identifier subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id()); // First check if we have this subscription if(! is_subscription_entry(sub_id)){ mdclog_write(MDCLOG_ERR, "subscription with id %s, %d does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); return SUBSCR_ERR_MISSING; } // Also check if such a request is queued if (is_request_entry(sub_id)){ mdclog_write(MDCLOG_ERR, "Subscription delete request with id %s, %d already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); return SUBSCR_ERR_DUPLICATE; } subscription_delete e2ap_sub_req_del; // generate the delete request pdu unsigned char buffer[128]; size_t buf_len = 128; res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he); if(! res){ mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str()); return SUBSCR_ERR_ENCODE; } // put entry in request table { std::lock_guard lock(*(_data_lock.get())); res = add_request_entry(sub_id, delete_request_pending); if(!res){ mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); return SUBSCR_ERR_DUPLICATE; } } std::unique_lock _local_lock(*(_data_lock.get())); // Send the message res = tx(TxCode, buf_len, buffer); if (!res){ delete_request_entry(sub_id); mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); return SUBSCR_ERR_TX; }; // record time stamp .. auto start = std::chrono::system_clock::now(); res = SUBSCR_ERR_UNKNOWN; while(1){ // wait to be woken up _cv.get()->wait_for(_local_lock, _time_out); // check status and return appropriate object int status = get_request_status(sub_id); if (status == delete_request_success){ mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_SUCCESS; break; } if (status == delete_request_pending){ // woken up spuriously or timed out auto end = std::chrono::system_clock::now(); std::chrono::duration f = end - start; if (f > _num_retries * _time_out){ mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_TIMEOUT; break; } else{ mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); } continue; } if(status == delete_request_failed){ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_FAIL; break; } // if we are here, some spurious // status obtained. we return false mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); res = SUBSCR_ERR_UNKNOWN; break; }; delete_request_entry(sub_id); // release data lock _local_lock.unlock(); std::cout <<"Returning res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl; return res; }; #endif