#ifndef SUBSCRIPTION_HANDLER
#define SUBSCRIPTION_HANDLER
+#include <functional>
#include <mdclog/mdclog.h>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <chrono>
+#include <tuple>
#include <subscription_request.hpp>
#include <subscription_response.hpp>
request_duplicate
}Subscription_Status_Types;
+using subscription_identifier = std::tuple<std::string , int>;
/* Class to process subscription related messages
- each subscription request is assigned a unique internally
-generated request id for tracking purposes. this is because
-the e2 subscription request does not carry any gnodeb id information
+ The class provides mechanism to send and process
+ subscriptions and subscription deletes.
+
+ NOTE 1: It is currently unclear how an xAPP should identify a
+ subscription request/response pair uniquely in the absence/presence of
+ subscription manager. Ideally, the subscription manager should be
+ transperent to the xAPP but that may not be the case, i.e the
+ subscription manager may the subscription request id fields.
+ The xAPP needs to identify uniquely not just the subscription response, but
+ also when it needs to send a delete for the corresponding request.
+ From that perspective, the fields present in both the subscription response and
+ a delete request are the RICrequestId and RANfunctionId. However, the subscription manager
+ may require that the RICrequestId fields be set to a specific value (TBD). Hence
+ for current purposes, a RIC subscription request is uniquely identified by the
+ tuple <gNodeB-ID, RANfunctionId>. This is not ideal, since potentially the same RANfunctionID
+ may be subscribed to in different modes, but for now this is the constraint.
+
+
+ NOTE 2: There is discussion on tracking subscription request/repsonse using the RMR transaction ID.
+ However, a conscious choice made with the subscription_handler is that it be agnostic to the transmission
+ medium(RMR) for purposes of design isolation. Consequently, the subscription handler is not aware of any RMR
+ related semantics, but simply accepts a function to send the request/delete request that accepts a signature
+ Type, Length, Value . This also means in its current design, we cannot use transaction id to track request/response.
+
+
+ NOTE 3: The subscription handler is thread-safe, i.e multiple elements
+ can request subscriptions/subscription deletes from multiple threads. However
+ this does not preclude conflict if multiple threads are trying to make
+ subscriptions based on the same triplet (in which cases, results will be internally
+ consistent, but may yield errors to calling agent).
*/
-class subscription_handler {
+struct subscription_hasher {
+ size_t operator()(const subscription_identifier & key) const {
+ return std::hash<std::string>{}(std::get<0>(key) + std::to_string(std::get<1>(key)));
+ }
+};
+class subscription_handler {
+
public:
- subscription_handler(void);
- subscription_handler(unsigned int, unsigned int);
+
+ subscription_handler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
void init(void);
+
template <typename Transmitter>
- int RequestSubscription(subscription_helper &, subscription_response_helper &, int , Transmitter &&);
+ int request_subscription(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&);
template<typename Transmitter>
- int RequestSubscriptionDelete(subscription_helper &, subscription_response_helper &, int , Transmitter &&);
-
-
- void Response(int, unsigned char *, int);
- int const get_request_status(int);
- subscription_response_helper * const get_subscription(int);
+ int request_subscription_delete(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&);
+ void Response(int, unsigned char *, int, const char *);
+ int const get_request_status(subscription_identifier);
+ subscription_response_helper * const get_subscription(subscription_identifier);
+
unsigned int get_next_id(void);
void set_timeout(unsigned int);
void set_num_retries(unsigned int);
- bool is_subscription_entry(int);
- bool is_request_entry(int);
-
+ bool is_subscription_entry(subscription_identifier);
+ bool is_request_entry(subscription_identifier);
+ void get_subscription_keys(std::vector<subscription_identifier> &);
void clear(void);
size_t num_pending(void) const;
size_t num_complete(void) const ;
private:
- bool add_request_entry(int, int);
- bool set_request_status(int, int);
- bool delete_request_entry(int);
+
+ bool add_request_entry(subscription_identifier, int);
+ bool set_request_status(subscription_identifier, int);
+ bool delete_request_entry(subscription_identifier);
- bool get_subscription_entry(int, int);
- bool add_subscription_entry(int, subscription_response_helper &he);
- bool delete_subscription_entry(int);
+ bool get_subscription_entry(subscription_identifier);
+ bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
+ bool delete_subscription_entry(subscription_identifier);
- std::unordered_map<int, int> requests_table;
- std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions
+ std::unordered_map<subscription_identifier, int, subscription_hasher> requests_table;
+ std::unordered_map<subscription_identifier, subscription_response_helper, subscription_hasher> subscription_responses; // stores list of successful subscriptions
std::unique_ptr<std::mutex> _data_lock;
std::unique_ptr<std::condition_variable> _cv;
};
template <typename Transmitter>
-int subscription_handler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
+int subscription_handler::request_subscription(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){
int res;
unsigned char buffer[512];
size_t buf_len = 512;
- // get a new unique request id ...
- unsigned int new_req_id = get_next_id();
- mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);
- he.set_request(new_req_id, he.get_req_seq());
-
-
+ // As per current design, request id and request sequence number
+ // must be set to zero ...
+ he.set_request(0, 0);
subscription_request e2ap_sub_req;
+
+ // generate subscription identifier
+ subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
// generate the request pdu
res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he);
// put entry in request table
{
std::lock_guard<std::mutex> lock(*(_data_lock.get()));
- res = add_request_entry(he.get_request_id(), request_pending);
+ res = add_request_entry(sub_id, request_pending);
if(! res){
- mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %d to queue", __FILE__, __LINE__, he.get_request_id());
- return SUBSCR_ERR_UNKNOWN;
+ mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+ return SUBSCR_ERR_DUPLICATE;
}
}
res = tx(TxCode, buf_len, buffer);
if (!res){
// clear state
- delete_request_entry(he.get_request_id());
- mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %d", __FILE__, __LINE__, he.get_request_id());
+ delete_request_entry(sub_id);
+ mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
return SUBSCR_ERR_TX;
};
res = SUBSCR_ERR_UNKNOWN;
while(1){
-
-
// release lock and wait to be woken up
_cv.get()->wait_for(_local_lock, _time_out);
// we have woken and acquired data_lock
// check status and return appropriate object
- int status = get_request_status(he.get_request_id());
+ int status = get_request_status(sub_id);
if (status == request_success){
-
// retreive & store the subscription response
- response = subscription_responses[he.get_request_id()];
- mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());
+ response = subscription_responses[sub_id];
+ mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
res = SUBSCR_SUCCESS;
break;
}
if (status == request_pending){
-
// woken up spuriously or timed out
auto end = std::chrono::system_clock::now();
std::chrono::duration<double> f = end - start;
if ( f > _num_retries * _time_out){
- mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());
+ mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
res = SUBSCR_ERR_TIMEOUT;
- std::cout <<"Set res = " << res << " for " << he.get_request_id() << std::endl;
break;
}
else{
- mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());
+ mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
continue;
}
}
if(status == request_failed){
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %d got failure response .. \n", __FILE__, __LINE__, he.get_request_id());
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
res = SUBSCR_ERR_FAIL;
break;
}
+ if (status == request_duplicate){
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+ res = SUBSCR_ERR_DUPLICATE;
+ break;
+
+ }
+
// if we are here, some spurious
- // status obtained or request failed . we return false
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
+ // status obtained or request failed . we return appropriate error code
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
res = SUBSCR_ERR_UNKNOWN;
break;
};
- delete_request_entry(he.get_request_id());
+ delete_request_entry(sub_id);
// release data lock
_local_lock.unlock();
- std::cout <<"Returning res = " << res << " for " << he.get_request_id() << std::endl;
+ std::cout <<"Returning res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," << std::get<1>(sub_id) << std::endl;
return res;
};
template <typename Transmitter>
-int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
+int subscription_handler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){
int res;
-
+ // generate subscription identifier
+ subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
+
// First check if we have this subscription
- if(! is_subscription_entry(he.get_request_id())){
- mdclog_write(MDCLOG_ERR, "subscription with id %d does not exist. Cannot be deleted", he.get_request_id());
+ if(! is_subscription_entry(sub_id)){
+ mdclog_write(MDCLOG_ERR, "subscription with id %s, %d does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
return SUBSCR_ERR_MISSING;
}
// Also check if such a request is queued
- if (is_request_entry(he.get_request_id())){
- mdclog_write(MDCLOG_ERR, "Subscription delete request with id %d already in queue", he.get_request_id());
- return SUBSCR_ERR_UNKNOWN;
+ if (is_request_entry(sub_id)){
+ mdclog_write(MDCLOG_ERR, "Subscription delete request with id %s, %d already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+ return SUBSCR_ERR_DUPLICATE;
}
subscription_delete e2ap_sub_req_del;
}
// put entry in request table
- res = add_request_entry(he.get_request_id(), delete_request_pending);
- if(! res){
- mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id());
- return SUBSCR_ERR_UNKNOWN;
+ {
+ std::lock_guard<std::mutex> lock(*(_data_lock.get()));
+ res = add_request_entry(sub_id, delete_request_pending);
+ if(!res){
+ mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
+ return SUBSCR_ERR_DUPLICATE;
+ }
}
std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
res = tx(TxCode, buf_len, buffer);
if (!res){
- delete_request_entry(he.get_request_id());
- mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id());
+ delete_request_entry(sub_id);
+ mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
return SUBSCR_ERR_TX;
};
_cv.get()->wait_for(_local_lock, _time_out);
// check status and return appropriate object
- int status = get_request_status(he.get_request_id());
+ int status = get_request_status(sub_id);
if (status == delete_request_success){
- mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id());
+ mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
res = SUBSCR_SUCCESS;
break;
}
std::chrono::duration<double> f = end - start;
if (f > _num_retries * _time_out){
- mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id());
+ mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
res = SUBSCR_ERR_TIMEOUT;
break;
}
else{
- mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id());
+ mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
}
continue;
}
if(status == delete_request_failed){
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %d got failure response .. \n", __FILE__, __LINE__, he.get_request_id());
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
res = SUBSCR_ERR_FAIL;
break;
}
+
// if we are here, some spurious
// status obtained. we return false
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
+ mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
res = SUBSCR_ERR_UNKNOWN;
break;
};
- delete_request_entry(he.get_request_id());
+ delete_request_entry(sub_id);
// release data lock
_local_lock.unlock();
- std::cout <<"Returning res = " << res << " for " << he.get_request_id() << std::endl;
+ std::cout <<"Returning res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl;
return res;
};