X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2FE2AP-c%2Fsubscription%2Fsubscription_handler.hpp;h=0dd78a8c67b3464d5e11186217c78f693dc0276e;hb=refs%2Fchanges%2F72%2F2072%2F1;hp=05efa9b1cc024177daeb9c55768e731bbb7ebc03;hpb=0054ece5d9d4bcb28ecda2f0f36584f6a64fc869;p=ric-app%2Fadmin.git diff --git a/src/E2AP-c/subscription/subscription_handler.hpp b/src/E2AP-c/subscription/subscription_handler.hpp index 05efa9b..0dd78a8 100644 --- a/src/E2AP-c/subscription/subscription_handler.hpp +++ b/src/E2AP-c/subscription/subscription_handler.hpp @@ -26,17 +26,28 @@ #ifndef SUBSCRIPTION_HANDLER #define SUBSCRIPTION_HANDLER +#include #include #include #include #include #include +#include #include #include #include #include +#define SUBSCR_SUCCESS 0 +#define SUBSCR_ERR_TX 1 +#define SUBSCR_ERR_TIMEOUT 2 +#define SUBSCR_ERR_FAIL 3 +#define SUBSCR_ERR_UNKNOWN 4 +#define SUBSCR_ERR_DUPLICATE 5 +#define SUBSCR_ERR_ENCODE 6 +#define SUBSCR_ERR_MISSING 7 + using namespace std; typedef enum { @@ -49,40 +60,73 @@ typedef enum { request_duplicate }Subscription_Status_Types; +using subscription_identifier = std::tuple; /* Class to process subscription related messages - each subscription request is assigned a unique internally -generated request id for tracking purposes. this is because -the e2 subscription request does not carry any gnodeb id information + The class provides mechanism to send and process + subscriptions and subscription deletes. + + NOTE 1: It is currently unclear how an xAPP should identify a + subscription request/response pair uniquely in the absence/presence of + subscription manager. Ideally, the subscription manager should be + transperent to the xAPP but that may not be the case, i.e the + subscription manager may the subscription request id fields. + The xAPP needs to identify uniquely not just the subscription response, but + also when it needs to send a delete for the corresponding request. + From that perspective, the fields present in both the subscription response and + a delete request are the RICrequestId and RANfunctionId. However, the subscription manager + may require that the RICrequestId fields be set to a specific value (TBD). Hence + for current purposes, a RIC subscription request is uniquely identified by the + tuple . This is not ideal, since potentially the same RANfunctionID + may be subscribed to in different modes, but for now this is the constraint. + + + NOTE 2: There is discussion on tracking subscription request/repsonse using the RMR transaction ID. + However, a conscious choice made with the subscription_handler is that it be agnostic to the transmission + medium(RMR) for purposes of design isolation. Consequently, the subscription handler is not aware of any RMR + related semantics, but simply accepts a function to send the request/delete request that accepts a signature + Type, Length, Value . This also means in its current design, we cannot use transaction id to track request/response. + + + NOTE 3: The subscription handler is thread-safe, i.e multiple elements + can request subscriptions/subscription deletes from multiple threads. However + this does not preclude conflict if multiple threads are trying to make + subscriptions based on the same triplet (in which cases, results will be internally + consistent, but may yield errors to calling agent). */ -class SubscriptionHandler { +struct subscription_hasher { + size_t operator()(const subscription_identifier & key) const { + return std::hash{}(std::get<0>(key) + std::to_string(std::get<1>(key))); + } +}; +class subscription_handler { + public: - SubscriptionHandler(void); - SubscriptionHandler(unsigned int, unsigned int); + + subscription_handler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2); void init(void); + template - bool RequestSubscription(subscription_helper &, subscription_response_helper &, int , Transmitter &&); + int request_subscription(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&); template - bool RequestSubscriptionDelete(subscription_helper &, subscription_response_helper &, int , Transmitter &&); - - - void Response(int, unsigned char *, int); - int const get_request_status(int); - subscription_response_helper * const get_subscription(int); + int request_subscription_delete(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&); + void Response(int, unsigned char *, int, const char *); + int const get_request_status(subscription_identifier); + subscription_response_helper * const get_subscription(subscription_identifier); + unsigned int get_next_id(void); void set_timeout(unsigned int); - void set_timeout_flag(bool); void set_num_retries(unsigned int); - bool is_subscription_entry(int); - bool is_request_entry(int); - + bool is_subscription_entry(subscription_identifier); + bool is_request_entry(subscription_identifier); + void get_subscription_keys(std::vector &); void clear(void); size_t num_pending(void) const; size_t num_complete(void) const ; @@ -91,62 +135,56 @@ public: private: - void ProcessSubscriptionResponse(unsigned char *, int len); - void ProcessSubscriptionFailure(unsigned char *, int len); - - void ProcessDeleteResponse(unsigned char *, int len); - void ProcessSubscriptionDeleteFailure(unsigned char *, int len); - - bool add_request_entry(int, int); - bool set_request_status(int, int); - bool delete_request_entry(int); + + bool add_request_entry(subscription_identifier, int); + bool set_request_status(subscription_identifier, int); + bool delete_request_entry(subscription_identifier); - bool get_subscription_entry(int, int); - bool add_subscription_entry(int, subscription_response_helper &he); - bool delete_subscription_entry(int); + bool get_subscription_entry(subscription_identifier); + bool add_subscription_entry(subscription_identifier, subscription_response_helper &he); + bool delete_subscription_entry(subscription_identifier); - std::unordered_map requests_table; - std::unordered_map subscription_responses; // stores list of successful subscriptions + std::unordered_map requests_table; + std::unordered_map subscription_responses; // stores list of successful subscriptions std::unique_ptr _data_lock; std::unique_ptr _cv; std::chrono::seconds _time_out; unsigned int _num_retries = 2; - bool _time_out_flag = true; unsigned int unique_request_id = 0; }; template -bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){ +int subscription_handler::request_subscription(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){ - bool res; + int res; unsigned char buffer[512]; size_t buf_len = 512; - // get a new unique request id ... - unsigned int new_req_id = get_next_id(); - mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id); - he.set_request(new_req_id, he.get_req_seq()); - - E2AP_PDU_t *e2ap_pdu = 0; + // As per current design, request id and request sequence number + // must be set to zero ... + he.set_request(0, 0); subscription_request e2ap_sub_req; + + // generate subscription identifier + subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id()); // generate the request pdu - res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, e2ap_pdu, he); + res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he); if(! res){ mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__); - return false; + return SUBSCR_ERR_ENCODE; } // put entry in request table { std::lock_guard lock(*(_data_lock.get())); - res = add_request_entry(he.get_request_id(), request_pending); + res = add_request_entry(sub_id, request_pending); if(! res){ - mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id()); - return false; + mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + return SUBSCR_ERR_DUPLICATE; } } @@ -158,92 +196,95 @@ bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscript res = tx(TxCode, buf_len, buffer); if (!res){ // clear state - delete_request_entry(he.get_request_id()); - mdclog_write(MDCLOG_ERR, "Error transmitting subscription request %d", he.get_request_id()); - return false; + delete_request_entry(sub_id); + mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); + return SUBSCR_ERR_TX; }; // record time stamp .. auto start = std::chrono::system_clock::now(); + res = SUBSCR_ERR_UNKNOWN; while(1){ - - // release lock and wait to be woken up _cv.get()->wait_for(_local_lock, _time_out); // we have woken and acquired data_lock // check status and return appropriate object - int status = get_request_status(he.get_request_id()); + int status = get_request_status(sub_id); if (status == request_success){ - // retreive the subscription response and clear queue - response = subscription_responses[he.get_request_id()]; - - // clear state - delete_request_entry(he.get_request_id()); - - // release data lock - _local_lock.unlock(); - mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id()); - + // retreive & store the subscription response + response = subscription_responses[sub_id]; + mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + res = SUBSCR_SUCCESS; break; } if (status == request_pending){ - // woken up spuriously or timed out auto end = std::chrono::system_clock::now(); std::chrono::duration f = end - start; - if (_time_out_flag && f > _num_retries * _time_out){ - delete_request_entry(he.get_request_id()); - mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id()); - - // Release data lock - _local_lock.unlock(); - return false; + if ( f > _num_retries * _time_out){ + mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + res = SUBSCR_ERR_TIMEOUT; + break; } else{ - mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id()); + mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); continue; } } - // if we are here, some spurious - // status obtained or request failed . we return false - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status); - delete_request_entry(he.get_request_id()); - - // release data lock - _local_lock.unlock(); + if(status == request_failed){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + res = SUBSCR_ERR_FAIL; + break; + } - return false; + if (status == request_duplicate){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + res = SUBSCR_ERR_DUPLICATE; + break; + + } + // if we are here, some spurious + // status obtained or request failed . we return appropriate error code + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); + res = SUBSCR_ERR_UNKNOWN; + break; }; - + + delete_request_entry(sub_id); - return true; + // release data lock + _local_lock.unlock(); + std::cout <<"Returning res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," << std::get<1>(sub_id) << std::endl; + return res; }; template -bool SubscriptionHandler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){ +int subscription_handler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){ + + int res; + // generate subscription identifier + subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id()); - bool res; - // First check if we have this subscription - if(! is_subscription_entry(he.get_request_id())){ - mdclog_write(MDCLOG_ERR, "subscription with id %d does not exist. Cannot be deleted", he.get_request_id()); - return false; + if(! is_subscription_entry(sub_id)){ + mdclog_write(MDCLOG_ERR, "subscription with id %s, %d does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + return SUBSCR_ERR_MISSING; } // Also check if such a request is queued - if (is_request_entry(he.get_request_id())){ - mdclog_write(MDCLOG_ERR, "Subscription delete request with id %d already in queue", he.get_request_id()); - return false; + if (is_request_entry(sub_id)){ + mdclog_write(MDCLOG_ERR, "Subscription delete request with id %s, %d already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + return SUBSCR_ERR_DUPLICATE; } subscription_delete e2ap_sub_req_del; @@ -255,14 +296,17 @@ bool SubscriptionHandler::RequestSubscriptionDelete(subscription_helper &he, sub res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he); if(! res){ mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str()); - return false; + return SUBSCR_ERR_ENCODE; } // put entry in request table - res = add_request_entry(he.get_request_id(), delete_request_pending); - if(! res){ - mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id()); - return false; + { + std::lock_guard lock(*(_data_lock.get())); + res = add_request_entry(sub_id, delete_request_pending); + if(!res){ + mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); + return SUBSCR_ERR_DUPLICATE; + } } std::unique_lock _local_lock(*(_data_lock.get())); @@ -271,26 +315,26 @@ bool SubscriptionHandler::RequestSubscriptionDelete(subscription_helper &he, sub res = tx(TxCode, buf_len, buffer); if (!res){ - delete_request_entry(he.get_request_id()); - mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id()); - return false; + delete_request_entry(sub_id); + mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + return SUBSCR_ERR_TX; }; // record time stamp .. auto start = std::chrono::system_clock::now(); + res = SUBSCR_ERR_UNKNOWN; while(1){ // wait to be woken up _cv.get()->wait_for(_local_lock, _time_out); // check status and return appropriate object - int status = get_request_status(he.get_request_id()); + int status = get_request_status(sub_id); if (status == delete_request_success){ - delete_request_entry(he.get_request_id()); - mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id()); - _local_lock.unlock(); + mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + res = SUBSCR_SUCCESS; break; } @@ -299,29 +343,39 @@ bool SubscriptionHandler::RequestSubscriptionDelete(subscription_helper &he, sub auto end = std::chrono::system_clock::now(); std::chrono::duration f = end - start; - if (_time_out_flag && f > _num_retries * _time_out){ - delete_request_entry(he.get_request_id()); - mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id()); - _local_lock.unlock(); - return false; + if (f > _num_retries * _time_out){ + mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + res = SUBSCR_ERR_TIMEOUT; + break; } else{ - mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id()); + mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); } continue; } - // if we are hear, some spurious + if(status == delete_request_failed){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + res = SUBSCR_ERR_FAIL; + break; + } + + // if we are here, some spurious // status obtained. we return false - delete_request_entry(he.get_request_id()); - _local_lock.unlock(); - return false; + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); + res = SUBSCR_ERR_UNKNOWN; + break; }; - return true; + delete_request_entry(sub_id); + + // release data lock + _local_lock.unlock(); + std::cout <<"Returning res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl; + return res; }; #endif