X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fxapp-mgmt%2Fsubs_mgmt.hpp;h=5d113ed76b5cdf751a8b0d0665d057ca2f44a7a8;hb=28b894594573ab1e7087ed8fb50b208d7b135b07;hp=77458d34cbe1542dc844fea338af59bd2aa236f7;hpb=6df19a4dacb4fcb6edb35a32af9c8f5c07c95e37;p=ric-app%2Fhw.git diff --git a/src/xapp-mgmt/subs_mgmt.hpp b/src/xapp-mgmt/subs_mgmt.hpp index 77458d3..5d113ed 100644 --- a/src/xapp-mgmt/subs_mgmt.hpp +++ b/src/xapp-mgmt/subs_mgmt.hpp @@ -1,6 +1,6 @@ /* ================================================================================== - Copyright (c) 2018-2019 AT&T Intellectual Property. + Copyright (c) 2019-2020 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -31,304 +31,172 @@ #include #include #include +#include +#include +#include #include #include +#include +#include "e2ap_action.hpp" +#include "e2ap_subscription_request.hpp" +#include "e2sm_subscription.hpp" + +#define SUBSCR_SUCCESS 1 +#define SUBSCR_ERR_TX -1 +#define SUBSCR_ERR_TIMEOUT -2 +#define SUBSCR_ERR_FAIL -3 +#define SUBSCR_ERR_UNKNOWN -4 +#define SUBSCR_ERR_DUPLICATE -5 -#include "subscription_delete_request.hpp" -#include "subscription_delete_response.hpp" -#include "subscription_request.hpp" -#include "subscription_response.hpp" +using namespace std; -#define SUBSCR_SUCCESS 0 -#define SUBSCR_ERR_TX 1 -#define SUBSCR_ERR_TIMEOUT 2 -#define SUBSCR_ERR_FAIL 3 -#define SUBSCR_ERR_UNKNOWN 4 -#define SUBSCR_ERR_DUPLICATE 5 -#define SUBSCR_ERR_ENCODE 6 -#define SUBSCR_ERR_MISSING 7 +class TransmitterBase +{ +public: + virtual ~TransmitterBase() {} -using namespace std; + template + const T& getParam() const; //to be implemented after Parameter + + template + void setParam(const U& rhs); //to be implemented after Parameter +}; + +template +class Transmitter : public TransmitterBase +{ +public: + Transmitter(const T& tx) :obj(tx) {} + const T& getParam() const {return obj;} + void setParam(const T& tx) {obj=tx;} +private: + T obj; +}; + +//Here's the trick: dynamic_cast rather than virtual +template const T& TransmitterBase::getParam() const +{ + return dynamic_cast&>(*this).getParam(); +} +template void TransmitterBase::setParam(const U& rhs) +{ + dynamic_cast&>(*this).setParam(rhs); + return; +} typedef enum { request_pending = 1, request_success, request_failed, - delete_request_pending, - delete_request_success, - delete_request_failed, request_duplicate }Subscription_Status_Types; -using subscription_identifier = std::tuple; -struct subscription_hasher { - size_t operator()(const subscription_identifier & key) const { - return std::hash{}(std::get<0>(key) + std::to_string(std::get<1>(key))); - } -}; +using transaction_identifier = std::string; +using transaction_status = Subscription_Status_Types; class SubscriptionHandler { public: - SubscriptionHandler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2); + SubscriptionHandler(unsigned int timeout_seconds = 30); - void init(void); + template + int manage_subscription_request(transaction_identifier, AppTransmitter &&); - template - int request_subscription(std::string, int , Transmitter &&); + template + int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&); - template - int request_subscription_delete(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&); + void manage_subscription_response(int message_type, transaction_identifier id); - void Response(int, unsigned char *, int, const char *); - int const get_request_status(subscription_identifier); - subscription_response_helper * const get_subscription(subscription_identifier); - - unsigned int get_next_id(void); + int get_request_status(transaction_identifier); + bool set_request_status(transaction_identifier, transaction_status); + bool is_request_entry(transaction_identifier); void set_timeout(unsigned int); - void set_num_retries(unsigned int); - - bool is_subscription_entry(subscription_identifier); - bool is_request_entry(subscription_identifier); - void get_subscription_keys(std::vector &); void clear(void); - size_t num_pending(void) const; - size_t num_complete(void) const ; + void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;}; + void print_subscription_status(){ for(auto it:status_table){std::cout << it.first << "::" << it.second << std::endl;}}; - private: - - - bool add_request_entry(subscription_identifier, int); - bool set_request_status(subscription_identifier, int); - bool delete_request_entry(subscription_identifier); - - bool get_subscription_entry(subscription_identifier); - bool add_subscription_entry(subscription_identifier, subscription_response_helper &he); - bool delete_subscription_entry(subscription_identifier); - - std::unordered_map requests_table; - std::unordered_map subscription_responses; // stores list of successful subscriptions + bool add_request_entry(transaction_identifier, transaction_status); + bool delete_request_entry(transaction_identifier); + + template + bool add_transmitter_entry(transaction_identifier, AppTransmitter&&); + + std::unordered_map trans_table; + std::unordered_map status_table; + std::unique_ptr _data_lock; std::unique_ptr _cv; std::chrono::seconds _time_out; - unsigned int _num_retries = 2; - unsigned int unique_request_id = 0; + bool _ignore_subs_resp = false; }; -template -int SubscriptionHandler::request_subscription(std::string node_id, int msgcode, Transmitter && tx){ +template +bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){ + mdclog_write(MDCLOG_INFO,"Entry added for Transaction ID: %s",id.c_str()); - // generate subscription identifier - subscription_identifier sub_id = std::make_tuple (node_id, 0); //0 is the function id which is hardcoded, which should come from rnib + // add entry in hash table if it does not exist + auto search = trans_table.find(id); + if(search != trans_table.end()){ + return false; + } + + Transmitter tptr(trans); + trans_table[id] = tptr; + return true; +}; - bool res; +//this will work for both sending subscription request and subscription delete request. +//The handler is oblivious of the message content and follows the transaction id. +template +int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){ + int res; // put entry in request table { std::lock_guard lock(*(_data_lock.get())); - res = add_request_entry(sub_id, request_pending); + + res = add_request_entry(rmr_trans_id, request_pending); if(! res){ - mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present", __FILE__, __LINE__, rmr_trans_id); return SUBSCR_ERR_DUPLICATE; } } + // acquire lock ... std::unique_lock _local_lock(*(_data_lock.get())); - // Send the message - res = tx(); + bool flg = tx(); - if (!res){ + if (!flg){ // clear state - delete_request_entry(sub_id); - mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); + delete_request_entry(rmr_trans_id); + mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id.c_str() ); return SUBSCR_ERR_TX; - }; - - - // record time stamp .. - auto start = std::chrono::system_clock::now(); - res = SUBSCR_ERR_UNKNOWN; - - while(1){ - // release lock and wait to be woken up - _cv.get()->wait_for(_local_lock, _time_out); - - // we have woken and acquired data_lock - // check status and return appropriate object - - int status = get_request_status(sub_id); - - if (status == request_success){ - // retreive & store the subscription response (why?) - // response = subscription_responses[sub_id]; - mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - res = SUBSCR_SUCCESS; - break; - } - - if (status == request_pending){ - // woken up spuriously or timed out - auto end = std::chrono::system_clock::now(); - std::chrono::duration f = end - start; - - if ( f > _num_retries * _time_out){ - mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - res = SUBSCR_ERR_TIMEOUT; - break; - } - else{ - mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - continue; - } - } - - if(status == request_failed){ - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - res = SUBSCR_ERR_FAIL; - break; - } - - if (status == request_duplicate){ - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - res = SUBSCR_ERR_DUPLICATE; - break; + } else { + mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id.c_str() ); + add_transmitter_entry(rmr_trans_id, tx); - } - - // if we are here, some spurious - // status obtained or request failed . we return appropriate error code - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); - res = SUBSCR_ERR_UNKNOWN; - break; - }; - - delete_request_entry(sub_id); - - // release data lock - _local_lock.unlock(); - std::cout <<"Returning res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," << std::get<1>(sub_id) << std::endl; - return res; -}; - - -/*template -int SubscriptionHandler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){ - - int res; - // generate subscription identifier - subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id()); - - // First check if we have this subscription - if(! is_subscription_entry(sub_id)){ - mdclog_write(MDCLOG_ERR, "subscription with id %s, %d does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - return SUBSCR_ERR_MISSING; } - // Also check if such a request is queued - if (is_request_entry(sub_id)){ - mdclog_write(MDCLOG_ERR, "Subscription delete request with id %s, %d already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - return SUBSCR_ERR_DUPLICATE; - } - - subscription_delete e2ap_sub_req_del; - - // generate the delete request pdu - unsigned char buffer[128]; - size_t buf_len = 128; - - res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he); - if(! res){ - mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str()); - return SUBSCR_ERR_ENCODE; - } - - // put entry in request table - { - std::lock_guard lock(*(_data_lock.get())); - res = add_request_entry(sub_id, delete_request_pending); - if(!res){ - mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); - return SUBSCR_ERR_DUPLICATE; - } - } - - std::unique_lock _local_lock(*(_data_lock.get())); - - // Send the message - res = tx(TxCode, buf_len, buffer); - - if (!res){ - delete_request_entry(sub_id); - mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - return SUBSCR_ERR_TX; - }; - - // record time stamp .. auto start = std::chrono::system_clock::now(); + std::chrono::milliseconds t_out(_time_out); - res = SUBSCR_ERR_UNKNOWN; - while(1){ - - // wait to be woken up - _cv.get()->wait_for(_local_lock, _time_out); + //the wait functionality has been removed. - // check status and return appropriate object - int status = get_request_status(sub_id); - if (status == delete_request_success){ - mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - res = SUBSCR_SUCCESS; - break; - } - - if (status == delete_request_pending){ - // woken up spuriously or timed out - auto end = std::chrono::system_clock::now(); - std::chrono::duration f = end - start; - - if (f > _num_retries * _time_out){ - mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - res = SUBSCR_ERR_TIMEOUT; - break; - } - else{ - mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - } - - continue; - } - - if(status == delete_request_failed){ - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); - res = SUBSCR_ERR_FAIL; - break; - } - // if we are here, some spurious - // status obtained. we return false - - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); - res = SUBSCR_ERR_UNKNOWN; - break; - - }; - - delete_request_entry(sub_id); - - // release data lock _local_lock.unlock(); - std::cout <<"Returning res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl; - return res; -};*/ + // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl; + return res; +}; #endif