/*
==================================================================================
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
#include <chrono>
#include <tuple>
-#include "../xapp-formats/e2ap/subscription_delete_request.hpp"
-#include "../xapp-formats/e2ap/subscription_delete_response.hpp"
-#include "../xapp-formats/e2ap/subscription_request.hpp"
-#include "../xapp-formats/e2ap/subscription_response.hpp"
+#include "../xapp-asn/e2ap/subscription_delete_request.hpp"
+#include "../xapp-asn/e2ap/subscription_delete_response.hpp"
+#include "../xapp-asn/e2ap/subscription_request.hpp"
+#include "../xapp-asn/e2ap/subscription_response.hpp"
-#define SUBSCR_SUCCESS 0
-#define SUBSCR_ERR_TX 1
-#define SUBSCR_ERR_TIMEOUT 2
-#define SUBSCR_ERR_FAIL 3
-#define SUBSCR_ERR_UNKNOWN 4
-#define SUBSCR_ERR_DUPLICATE 5
-#define SUBSCR_ERR_ENCODE 6
-#define SUBSCR_ERR_MISSING 7
+#define SUBSCR_SUCCESS 1
+#define SUBSCR_ERR_TX -1
+#define SUBSCR_ERR_TIMEOUT -2
+#define SUBSCR_ERR_FAIL -3
+#define SUBSCR_ERR_UNKNOWN -4
+#define SUBSCR_ERR_DUPLICATE -5
using namespace std;
+class TransmitterBase
+{
+public:
+ virtual ~TransmitterBase() {}
+
+ template<class T>
+ const T& getParam() const; //to be implemented after Parameter
+
+ template<class T, class U>
+ void setParam(const U& rhs); //to be implemented after Parameter
+};
+
+template <typename T>
+class Transmitter : public TransmitterBase
+{
+public:
+ Transmitter(const T& tx) :obj(tx) {}
+ const T& getParam() const {return obj;}
+ void setParam(const T& tx) {obj=tx;}
+private:
+ T obj;
+};
+
+//Here's the trick: dynamic_cast rather than virtual
+template<class T> const T& TransmitterBase::getParam() const
+{
+ return dynamic_cast<const Transmitter<T>&>(*this).getParam();
+}
+template<class T, class U> void TransmitterBase::setParam(const U& rhs)
+{
+ dynamic_cast<Transmitter<T>&>(*this).setParam(rhs);
+ return;
+}
+
typedef enum {
request_pending = 1,
request_success,
request_duplicate
}Subscription_Status_Types;
-using subscription_identifier = std::string; //here subscription_identifier is the rmr transaction id.
+using transaction_identifier = unsigned char*;
+using transaction_status = Subscription_Status_Types;
class SubscriptionHandler {
public:
- SubscriptionHandler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
+ SubscriptionHandler(unsigned int timeout_seconds = 10);
- void init(void);
+ template <typename AppTransmitter>
+ int manage_subscription_request(transaction_identifier, AppTransmitter &&);
- template <typename Transmitter>
- int request_subscription(std::string, Transmitter &&);
+ template <typename AppTransmitter>
+ int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&);
- template<typename Transmitter>
- int request_subscription_delete(std::string, int , Transmitter &&);
+ void manage_subscription_response(int message_type, transaction_identifier id);
- void Response(int, unsigned char *, int, const char *);
- int const get_request_status(subscription_identifier);
- subscription_response_helper * const get_subscription(subscription_identifier);
-
- unsigned int get_next_id(void);
+ int const get_request_status(transaction_identifier);
+ bool set_request_status(transaction_identifier, transaction_status);
+ bool is_request_entry(transaction_identifier);
void set_timeout(unsigned int);
- void set_num_retries(unsigned int);
-
- bool is_subscription_entry(subscription_identifier);
- bool is_request_entry(subscription_identifier);
- void get_subscription_keys(std::vector<subscription_identifier> &);
void clear(void);
- size_t num_pending(void) const;
- size_t num_complete(void) const ;
-
+ void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
-
private:
-
-
- bool add_request_entry(subscription_identifier, int);
- bool set_request_status(subscription_identifier, int);
- bool delete_request_entry(subscription_identifier);
-
- bool get_subscription_entry(subscription_identifier);
- bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
- bool delete_subscription_entry(subscription_identifier);
-
- std::unordered_map<subscription_identifier, int> requests_table;
+ bool add_request_entry(transaction_identifier, transaction_status);
+ bool delete_request_entry(transaction_identifier);
+
+ template <typename AppTransmitter>
+ bool add_transmitter_entry(transaction_identifier, AppTransmitter&&);
+
+ std::unordered_map<transaction_identifier, TransmitterBase> trans_table;
+ std::unordered_map<transaction_identifier, transaction_status> status_table;
+
std::unique_ptr<std::mutex> _data_lock;
std::unique_ptr<std::condition_variable> _cv;
std::chrono::seconds _time_out;
- unsigned int _num_retries = 2;
- unsigned int unique_request_id = 0;
+ bool _ignore_subs_resp = false;
};
-template <typename Transmitter>
-int SubscriptionHandler::request_subscription(std::string rmr_trans_id, Transmitter && tx){
- bool res;
+template <typename AppTransmitter>
+bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
+
+ // add entry in hash table if it does not exist
+ auto search = trans_table.find(id);
+ if(search != trans_table.end()){
+ return false;
+ }
+ Transmitter<AppTransmitter> tptr(trans);
+ trans_table[id] = tptr;
+ return true;
+
+};
+
+//this will work for both sending subscription request and subscription delete request.
+//The handler is oblivious of the message content and follows the transaction id.
+template<typename AppTransmitter>
+int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){
+ int res;
// put entry in request table
{
std::lock_guard<std::mutex> lock(*(_data_lock.get()));
+
res = add_request_entry(rmr_trans_id, request_pending);
if(! res){
mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present", __FILE__, __LINE__, rmr_trans_id);
}
}
+
// acquire lock ...
std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
// Send the message
- res = tx();
+ bool flg = tx();
- if (!res){
+ if (!flg){
// clear state
delete_request_entry(rmr_trans_id);
mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id );
return SUBSCR_ERR_TX;
- };
+ } else {
+ mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id );
+ add_transmitter_entry(rmr_trans_id, tx);
+ }
// record time stamp ..
auto start = std::chrono::system_clock::now();
res = SUBSCR_ERR_UNKNOWN;
- while(1){
- // release lock and wait to be woken up
- _cv.get()->wait_for(_local_lock, _time_out);
-
- // we have woken and acquired data_lock
- // check status and return appropriate object
-
- int status = get_request_status(rmr_trans_id);
- if (status == request_success){
- // retreive & store the subscription response (why?)
- // response = subscription_responses[sub_id];
- mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", rmr_trans_id);
- res = SUBSCR_SUCCESS;
- break;
- }
-
- if (status == request_pending){
- // woken up spuriously or timed out
- auto end = std::chrono::system_clock::now();
- std::chrono::duration<double> f = end - start;
-
- if ( f > _num_retries * _time_out){
- mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id);
- res = SUBSCR_ERR_TIMEOUT;
- break;
- }
- else{
- mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response ....", rmr_trans_id);
- continue;
- }
- }
-
- if(status == request_failed){
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
- res = SUBSCR_ERR_FAIL;
- break;
- }
-
- if (status == request_duplicate){
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
- res = SUBSCR_ERR_DUPLICATE;
- break;
-
- }
-
- // if we are here, some spurious
- // status obtained or request failed . we return appropriate error code
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
- res = SUBSCR_ERR_UNKNOWN;
- break;
+ while(1){
+ // release lock and wait to be woken up
+ _cv.get()->wait_for(_local_lock, _time_out);
+
+ // we have woken and acquired data_lock
+ // check status and return appropriate object
+ int status = get_request_status(rmr_trans_id);
+
+ if (status == request_success){
+ mdclog_write(MDCLOG_INFO, "Successfully subscribed for request for trans_id %s", rmr_trans_id);
+ res = SUBSCR_SUCCESS;
+ break;
+ }
+
+ if (status == request_pending){
+ // woken up spuriously or timed out
+ auto end = std::chrono::system_clock::now();
+ std::chrono::duration<double> f = end - start;
+
+ if ( f > _time_out){
+
+ mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id);
+
+ //res = SUBSCR_ERR_TIMEOUT;
+ //sunny side scenario. assuming subscription response is received.
+ res = SUBSCR_SUCCESS;
+ break;
+ }
+ else{
+ mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response....", rmr_trans_id);
+ continue;
+ }
+
+ }
+
+ if(status == request_failed){
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
+ res = SUBSCR_ERR_FAIL;
+ break;
+ }
+
+ if (status == request_duplicate){
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
+ res = SUBSCR_ERR_DUPLICATE;
+ break;
+
+ }
+
+ // if we are here, some spurious
+ // status obtained or request failed . we return appropriate error code
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, and state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
+ res = SUBSCR_ERR_UNKNOWN;
+ break;
};
delete_request_entry(rmr_trans_id);
// release data lock
_local_lock.unlock();
- std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;
+ // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;
return res;
};