X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2FE2AP-c%2Fsubscription%2Fsubscription_handler.hpp;fp=src%2FE2AP-c%2Fsubscription%2Fsubscription_handler.hpp;h=0dd78a8c67b3464d5e11186217c78f693dc0276e;hb=4e545a8b013e60f2ff59254cb3fe435012d8fe5a;hp=fcfd77f756ffee13561ea012b535f8b2b529e426;hpb=82ba4b9999ca8e09461315a919b36a66641a6c7d;p=ric-app%2Fadmin.git diff --git a/src/E2AP-c/subscription/subscription_handler.hpp b/src/E2AP-c/subscription/subscription_handler.hpp index fcfd77f..0dd78a8 100644 --- a/src/E2AP-c/subscription/subscription_handler.hpp +++ b/src/E2AP-c/subscription/subscription_handler.hpp @@ -26,11 +26,13 @@ #ifndef SUBSCRIPTION_HANDLER #define SUBSCRIPTION_HANDLER +#include #include #include #include #include #include +#include #include #include @@ -58,39 +60,73 @@ typedef enum { request_duplicate }Subscription_Status_Types; +using subscription_identifier = std::tuple; /* Class to process subscription related messages - each subscription request is assigned a unique internally -generated request id for tracking purposes. this is because -the e2 subscription request does not carry any gnodeb id information + The class provides mechanism to send and process + subscriptions and subscription deletes. + + NOTE 1: It is currently unclear how an xAPP should identify a + subscription request/response pair uniquely in the absence/presence of + subscription manager. Ideally, the subscription manager should be + transperent to the xAPP but that may not be the case, i.e the + subscription manager may the subscription request id fields. + The xAPP needs to identify uniquely not just the subscription response, but + also when it needs to send a delete for the corresponding request. + From that perspective, the fields present in both the subscription response and + a delete request are the RICrequestId and RANfunctionId. However, the subscription manager + may require that the RICrequestId fields be set to a specific value (TBD). Hence + for current purposes, a RIC subscription request is uniquely identified by the + tuple . This is not ideal, since potentially the same RANfunctionID + may be subscribed to in different modes, but for now this is the constraint. + + + NOTE 2: There is discussion on tracking subscription request/repsonse using the RMR transaction ID. + However, a conscious choice made with the subscription_handler is that it be agnostic to the transmission + medium(RMR) for purposes of design isolation. Consequently, the subscription handler is not aware of any RMR + related semantics, but simply accepts a function to send the request/delete request that accepts a signature + Type, Length, Value . This also means in its current design, we cannot use transaction id to track request/response. + + + NOTE 3: The subscription handler is thread-safe, i.e multiple elements + can request subscriptions/subscription deletes from multiple threads. However + this does not preclude conflict if multiple threads are trying to make + subscriptions based on the same triplet (in which cases, results will be internally + consistent, but may yield errors to calling agent). */ -class subscription_handler { +struct subscription_hasher { + size_t operator()(const subscription_identifier & key) const { + return std::hash{}(std::get<0>(key) + std::to_string(std::get<1>(key))); + } +}; +class subscription_handler { + public: - subscription_handler(void); - subscription_handler(unsigned int, unsigned int); + + subscription_handler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2); void init(void); + template - int RequestSubscription(subscription_helper &, subscription_response_helper &, int , Transmitter &&); + int request_subscription(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&); template - int RequestSubscriptionDelete(subscription_helper &, subscription_response_helper &, int , Transmitter &&); - - - void Response(int, unsigned char *, int); - int const get_request_status(int); - subscription_response_helper * const get_subscription(int); + int request_subscription_delete(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&); + void Response(int, unsigned char *, int, const char *); + int const get_request_status(subscription_identifier); + subscription_response_helper * const get_subscription(subscription_identifier); + unsigned int get_next_id(void); void set_timeout(unsigned int); void set_num_retries(unsigned int); - bool is_subscription_entry(int); - bool is_request_entry(int); - + bool is_subscription_entry(subscription_identifier); + bool is_request_entry(subscription_identifier); + void get_subscription_keys(std::vector &); void clear(void); size_t num_pending(void) const; size_t num_complete(void) const ; @@ -99,16 +135,17 @@ public: private: - bool add_request_entry(int, int); - bool set_request_status(int, int); - bool delete_request_entry(int); + + bool add_request_entry(subscription_identifier, int); + bool set_request_status(subscription_identifier, int); + bool delete_request_entry(subscription_identifier); - bool get_subscription_entry(int, int); - bool add_subscription_entry(int, subscription_response_helper &he); - bool delete_subscription_entry(int); + bool get_subscription_entry(subscription_identifier); + bool add_subscription_entry(subscription_identifier, subscription_response_helper &he); + bool delete_subscription_entry(subscription_identifier); - std::unordered_map requests_table; - std::unordered_map subscription_responses; // stores list of successful subscriptions + std::unordered_map requests_table; + std::unordered_map subscription_responses; // stores list of successful subscriptions std::unique_ptr _data_lock; std::unique_ptr _cv; @@ -120,19 +157,19 @@ private: }; template -int subscription_handler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){ +int subscription_handler::request_subscription(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){ int res; unsigned char buffer[512]; size_t buf_len = 512; - // get a new unique request id ... - unsigned int new_req_id = get_next_id(); - mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id); - he.set_request(new_req_id, he.get_req_seq()); - - + // As per current design, request id and request sequence number + // must be set to zero ... + he.set_request(0, 0); subscription_request e2ap_sub_req; + + // generate subscription identifier + subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id()); // generate the request pdu res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he); @@ -144,10 +181,10 @@ int subscription_handler::RequestSubscription(subscription_helper &he, subscript // put entry in request table { std::lock_guard lock(*(_data_lock.get())); - res = add_request_entry(he.get_request_id(), request_pending); + res = add_request_entry(sub_id, request_pending); if(! res){ - mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %d to queue", __FILE__, __LINE__, he.get_request_id()); - return SUBSCR_ERR_UNKNOWN; + mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + return SUBSCR_ERR_DUPLICATE; } } @@ -159,8 +196,8 @@ int subscription_handler::RequestSubscription(subscription_helper &he, subscript res = tx(TxCode, buf_len, buffer); if (!res){ // clear state - delete_request_entry(he.get_request_id()); - mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %d", __FILE__, __LINE__, he.get_request_id()); + delete_request_entry(sub_id); + mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); return SUBSCR_ERR_TX; }; @@ -170,80 +207,84 @@ int subscription_handler::RequestSubscription(subscription_helper &he, subscript res = SUBSCR_ERR_UNKNOWN; while(1){ - - // release lock and wait to be woken up _cv.get()->wait_for(_local_lock, _time_out); // we have woken and acquired data_lock // check status and return appropriate object - int status = get_request_status(he.get_request_id()); + int status = get_request_status(sub_id); if (status == request_success){ - // retreive & store the subscription response - response = subscription_responses[he.get_request_id()]; - mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id()); + response = subscription_responses[sub_id]; + mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_SUCCESS; break; } if (status == request_pending){ - // woken up spuriously or timed out auto end = std::chrono::system_clock::now(); std::chrono::duration f = end - start; if ( f > _num_retries * _time_out){ - mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id()); + mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_TIMEOUT; - std::cout <<"Set res = " << res << " for " << he.get_request_id() << std::endl; break; } else{ - mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id()); + mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); continue; } } if(status == request_failed){ - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %d got failure response .. \n", __FILE__, __LINE__, he.get_request_id()); + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_FAIL; break; } + if (status == request_duplicate){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + res = SUBSCR_ERR_DUPLICATE; + break; + + } + // if we are here, some spurious - // status obtained or request failed . we return false - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status); + // status obtained or request failed . we return appropriate error code + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); res = SUBSCR_ERR_UNKNOWN; break; }; - delete_request_entry(he.get_request_id()); + delete_request_entry(sub_id); // release data lock _local_lock.unlock(); - std::cout <<"Returning res = " << res << " for " << he.get_request_id() << std::endl; + std::cout <<"Returning res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," << std::get<1>(sub_id) << std::endl; return res; }; template -int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){ +int subscription_handler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){ int res; - + // generate subscription identifier + subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id()); + // First check if we have this subscription - if(! is_subscription_entry(he.get_request_id())){ - mdclog_write(MDCLOG_ERR, "subscription with id %d does not exist. Cannot be deleted", he.get_request_id()); + if(! is_subscription_entry(sub_id)){ + mdclog_write(MDCLOG_ERR, "subscription with id %s, %d does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); return SUBSCR_ERR_MISSING; } // Also check if such a request is queued - if (is_request_entry(he.get_request_id())){ - mdclog_write(MDCLOG_ERR, "Subscription delete request with id %d already in queue", he.get_request_id()); - return SUBSCR_ERR_UNKNOWN; + if (is_request_entry(sub_id)){ + mdclog_write(MDCLOG_ERR, "Subscription delete request with id %s, %d already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + return SUBSCR_ERR_DUPLICATE; } subscription_delete e2ap_sub_req_del; @@ -259,10 +300,13 @@ int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su } // put entry in request table - res = add_request_entry(he.get_request_id(), delete_request_pending); - if(! res){ - mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id()); - return SUBSCR_ERR_UNKNOWN; + { + std::lock_guard lock(*(_data_lock.get())); + res = add_request_entry(sub_id, delete_request_pending); + if(!res){ + mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); + return SUBSCR_ERR_DUPLICATE; + } } std::unique_lock _local_lock(*(_data_lock.get())); @@ -271,8 +315,8 @@ int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su res = tx(TxCode, buf_len, buffer); if (!res){ - delete_request_entry(he.get_request_id()); - mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id()); + delete_request_entry(sub_id); + mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); return SUBSCR_ERR_TX; }; @@ -287,9 +331,9 @@ int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su _cv.get()->wait_for(_local_lock, _time_out); // check status and return appropriate object - int status = get_request_status(he.get_request_id()); + int status = get_request_status(sub_id); if (status == delete_request_success){ - mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id()); + mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_SUCCESS; break; } @@ -300,36 +344,37 @@ int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su std::chrono::duration f = end - start; if (f > _num_retries * _time_out){ - mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id()); + mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_TIMEOUT; break; } else{ - mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id()); + mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); } continue; } if(status == delete_request_failed){ - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %d got failure response .. \n", __FILE__, __LINE__, he.get_request_id()); + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_FAIL; break; } + // if we are here, some spurious // status obtained. we return false - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status); + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); res = SUBSCR_ERR_UNKNOWN; break; }; - delete_request_entry(he.get_request_id()); + delete_request_entry(sub_id); // release data lock _local_lock.unlock(); - std::cout <<"Returning res = " << res << " for " << he.get_request_id() << std::endl; + std::cout <<"Returning res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl; return res; };