#include <chrono>
#include <tuple>
-#include "subscription_delete_request.hpp"
-#include "subscription_delete_response.hpp"
-#include "subscription_request.hpp"
-#include "subscription_response.hpp"
+#include "../xapp-formats/e2ap/subscription_delete_request.hpp"
+#include "../xapp-formats/e2ap/subscription_delete_response.hpp"
+#include "../xapp-formats/e2ap/subscription_request.hpp"
+#include "../xapp-formats/e2ap/subscription_response.hpp"
#define SUBSCR_SUCCESS 0
#define SUBSCR_ERR_TX 1
request_duplicate
}Subscription_Status_Types;
-using subscription_identifier = std::tuple<std::string , int>;
-
-struct subscription_hasher {
- size_t operator()(const subscription_identifier & key) const {
- return std::hash<std::string>{}(std::get<0>(key) + std::to_string(std::get<1>(key)));
- }
-};
+using subscription_identifier = std::string; //here subscription_identifier is the rmr transaction id.
class SubscriptionHandler {
void init(void);
template <typename Transmitter>
- int request_subscription(std::string, int , Transmitter &&);
+ int request_subscription(std::string, Transmitter &&);
template<typename Transmitter>
- int request_subscription_delete(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&);
+ int request_subscription_delete(std::string, int , Transmitter &&);
void Response(int, unsigned char *, int, const char *);
int const get_request_status(subscription_identifier);
bool get_subscription_entry(subscription_identifier);
bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
bool delete_subscription_entry(subscription_identifier);
-
- std::unordered_map<subscription_identifier, int, subscription_hasher> requests_table;
- std::unordered_map<subscription_identifier, subscription_response_helper, subscription_hasher> subscription_responses; // stores list of successful subscriptions
+
+ std::unordered_map<subscription_identifier, int> requests_table;
std::unique_ptr<std::mutex> _data_lock;
std::unique_ptr<std::condition_variable> _cv;
};
template <typename Transmitter>
-int SubscriptionHandler::request_subscription(std::string node_id, int msgcode, Transmitter && tx){
-
- // generate subscription identifier
- subscription_identifier sub_id = std::make_tuple (node_id, 0); //0 is the function id which is hardcoded, which should come from rnib
-
-
+int SubscriptionHandler::request_subscription(std::string rmr_trans_id, Transmitter && tx){
bool res;
+
// put entry in request table
{
std::lock_guard<std::mutex> lock(*(_data_lock.get()));
- res = add_request_entry(sub_id, request_pending);
+ res = add_request_entry(rmr_trans_id, request_pending);
if(! res){
- mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+ mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present", __FILE__, __LINE__, rmr_trans_id);
return SUBSCR_ERR_DUPLICATE;
}
}
// acquire lock ...
std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
-
// Send the message
res = tx();
if (!res){
// clear state
- delete_request_entry(sub_id);
- mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
+ delete_request_entry(rmr_trans_id);
+ mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id );
return SUBSCR_ERR_TX;
};
// we have woken and acquired data_lock
// check status and return appropriate object
- int status = get_request_status(sub_id);
+ int status = get_request_status(rmr_trans_id);
if (status == request_success){
// retreive & store the subscription response (why?)
// response = subscription_responses[sub_id];
- mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+ mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", rmr_trans_id);
res = SUBSCR_SUCCESS;
break;
}
std::chrono::duration<double> f = end - start;
if ( f > _num_retries * _time_out){
- mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+ mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id);
res = SUBSCR_ERR_TIMEOUT;
break;
}
else{
- mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+ mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response ....", rmr_trans_id);
continue;
}
}
if(status == request_failed){
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
res = SUBSCR_ERR_FAIL;
break;
}
if (status == request_duplicate){
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
res = SUBSCR_ERR_DUPLICATE;
break;
// if we are here, some spurious
// status obtained or request failed . we return appropriate error code
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
res = SUBSCR_ERR_UNKNOWN;
break;
};
- delete_request_entry(sub_id);
+ delete_request_entry(rmr_trans_id);
// release data lock
_local_lock.unlock();
- std::cout <<"Returning res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," << std::get<1>(sub_id) << std::endl;
+ std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;
return res;
};
-
-/*template <typename Transmitter>
-int SubscriptionHandler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){
-
- int res;
- // generate subscription identifier
- subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
-
- // First check if we have this subscription
- if(! is_subscription_entry(sub_id)){
- mdclog_write(MDCLOG_ERR, "subscription with id %s, %d does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
- return SUBSCR_ERR_MISSING;
- }
-
- // Also check if such a request is queued
- if (is_request_entry(sub_id)){
- mdclog_write(MDCLOG_ERR, "Subscription delete request with id %s, %d already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
- return SUBSCR_ERR_DUPLICATE;
- }
-
- subscription_delete e2ap_sub_req_del;
-
- // generate the delete request pdu
- unsigned char buffer[128];
- size_t buf_len = 128;
-
- res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
- if(! res){
- mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
- return SUBSCR_ERR_ENCODE;
- }
-
- // put entry in request table
- {
- std::lock_guard<std::mutex> lock(*(_data_lock.get()));
- res = add_request_entry(sub_id, delete_request_pending);
- if(!res){
- mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
- return SUBSCR_ERR_DUPLICATE;
- }
- }
-
- std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
-
- // Send the message
- res = tx(TxCode, buf_len, buffer);
-
- if (!res){
- delete_request_entry(sub_id);
- mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
- return SUBSCR_ERR_TX;
- };
-
-
- // record time stamp ..
- auto start = std::chrono::system_clock::now();
-
- res = SUBSCR_ERR_UNKNOWN;
- while(1){
-
- // wait to be woken up
- _cv.get()->wait_for(_local_lock, _time_out);
-
- // check status and return appropriate object
- int status = get_request_status(sub_id);
- if (status == delete_request_success){
- mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
- res = SUBSCR_SUCCESS;
- break;
- }
-
- if (status == delete_request_pending){
- // woken up spuriously or timed out
- auto end = std::chrono::system_clock::now();
- std::chrono::duration<double> f = end - start;
-
- if (f > _num_retries * _time_out){
- mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
- res = SUBSCR_ERR_TIMEOUT;
- break;
- }
- else{
- mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
- }
-
- continue;
- }
-
- if(status == delete_request_failed){
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
- res = SUBSCR_ERR_FAIL;
- break;
- }
-
- // if we are here, some spurious
- // status obtained. we return false
-
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
- res = SUBSCR_ERR_UNKNOWN;
- break;
-
- };
-
- delete_request_entry(sub_id);
-
- // release data lock
- _local_lock.unlock();
- std::cout <<"Returning res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl;
- return res;
-};*/
-
#endif