From: sjana Date: Tue, 6 Oct 2020 21:29:57 +0000 (-0400) Subject: Changes to Subscription Handler X-Git-Tag: 1.1.0~7 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?p=ric-app%2Fhw.git;a=commitdiff_plain;h=21294a186445672e15db465618064642443cc39a Changes to Subscription Handler Issue-ID: RICAPP-148 Signed-off-by: sjana Change-Id: Ifa2a521bca16290219a6ef294bb96d1bc1f31195 --- diff --git a/src/hw_xapp_main.cc b/src/hw_xapp_main.cc index 1771797..a4a5eef 100644 --- a/src/hw_xapp_main.cc +++ b/src/hw_xapp_main.cc @@ -60,29 +60,36 @@ int main(int argc, char *argv[]){ //Create Subscription Handler if Xapp deals with Subscription. - std::unique_ptr sub_handler = std::make_unique(); + //std::unique_ptr sub_handler = std::make_unique(); + + SubscriptionHandler sub_handler; //create HelloWorld Xapp Instance. std::unique_ptr hw_xapp; hw_xapp = std::make_unique(std::ref(config),std::ref(*rmr)); mdclog_write(MDCLOG_INFO, "Created Hello World Xapp Instance"); - - sleep(1); //Startup E2 subscription and A1 policy - hw_xapp->startup(std::ref(*sub_handler)); + hw_xapp->startup(sub_handler); + + sleep(10); //start listener threads and register message handlers. int num_threads = std::stoi(config[XappSettings::SettingName::THREADS]); mdclog_write(MDCLOG_INFO, "Starting Listener Threads. Number of Workers = %d", num_threads); - std::unique_ptr mp_handler = std::make_unique(config[XappSettings::SettingName::XAPP_ID]); + std::unique_ptr mp_handler = std::make_unique(config[XappSettings::SettingName::XAPP_ID], sub_handler); + hw_xapp->start_xapp_receiver(std::ref(*mp_handler)); sleep(1); - //xapp->shutdown(); + + + //hw_xapp->Run() //for spinning multiple receiving threads. + + //hw_xapp->shutdown(); while(1){ sleep(1); diff --git a/src/xapp-mgmt/subs_mgmt.cc b/src/xapp-mgmt/subs_mgmt.cc index 58da73c..dca9412 100644 --- a/src/xapp-mgmt/subs_mgmt.cc +++ b/src/xapp-mgmt/subs_mgmt.cc @@ -21,10 +21,9 @@ * Author: Ashwin Shridharan, Shraboni Jana */ #include "subs_mgmt.hpp" - +#include #include - SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds):_time_out(std::chrono::seconds(timeout_seconds)){ _data_lock = std::make_unique(); _cv = std::make_unique(); @@ -67,10 +66,10 @@ bool SubscriptionHandler::delete_request_entry(transaction_identifier id){ if (search != status_table.end()){ status_table.erase(search); - mdclog_write(MDCLOG_INFO,"Entry for Transaction ID deleted: %s",id.c_str()); + mdclog_write(MDCLOG_INFO,"Entry for Transaction ID deleted: %d",id); return true; } - mdclog_write(MDCLOG_INFO,"Entry not found in SubscriptionHandler for Transaction ID: %s",id.c_str()); + mdclog_write(MDCLOG_INFO,"Entry not found in SubscriptionHandler for Transaction ID: %d",id); return false; }; @@ -79,48 +78,63 @@ bool SubscriptionHandler::delete_request_entry(transaction_identifier id){ bool SubscriptionHandler::set_request_status(transaction_identifier id, transaction_status status){ // change status of a request only if it exists. - auto search = status_table.find(id); - if(search != status_table.end()){ - status_table[id] = status; - return true; - } - + for(auto &it:status_table){ + if(strcmp(it.first.c_str(), id.c_str())==0) { + it.second = status; + return true; + } + } return false; - + }; -int const SubscriptionHandler::get_request_status(transaction_identifier id){ - auto search = status_table.find(id); - if (search == status_table.end()){ - return -1; - } - - return search->second; +int SubscriptionHandler::get_request_status(transaction_identifier id){ + + for(auto it:status_table){ + if(strcmp(it.first.c_str(), id.c_str())==0) { + return it.second; + } + } + + + return -1; } bool SubscriptionHandler::is_request_entry(transaction_identifier id){ - auto search = status_table.find(id); - if (search != status_table.end()) - return true; - else + for(auto it:status_table){ + if(strcmp(it.first.c_str(), id.c_str())==0) { + return true; + } + } return false; } + + + // Handles subscription responses void SubscriptionHandler::manage_subscription_response(int message_type, transaction_identifier id){ + // Make This Thread sleep for 1 Second + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + { + std::unique_lock _local_lock(*(_data_lock.get())); + mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for me id %s WAS: %d",id.c_str(),this->get_request_status(id)); - bool res; - std::cout << "In Manage subscription" << std::endl; + //from the message type we can know if its a success/failure etc. + if(message_type==RIC_SUB_RESP) + this->set_request_status(id, request_success); - // wake up all waiting users ... - if(is_request_entry(id)){ - std::cout << "In Manage subscription" << std::endl; - set_request_status(id, request_success); - _cv.get()->notify_all(); - } + if(message_type==RIC_SUB_FAILURE) + this->set_request_status(id,request_failed); + + mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for me id %s IS: %d",id.c_str(),this->get_request_status(id)); + + //this->print_subscription_status(); + } + //_cv.get()->notify_all(); } diff --git a/src/xapp-mgmt/subs_mgmt.hpp b/src/xapp-mgmt/subs_mgmt.hpp index 4cc328a..c500827 100644 --- a/src/xapp-mgmt/subs_mgmt.hpp +++ b/src/xapp-mgmt/subs_mgmt.hpp @@ -31,14 +31,17 @@ #include #include #include +#include +#include +#include #include #include -#include +#include -#include "../xapp-asn/e2ap/subscription_delete_request.hpp" -#include "../xapp-asn/e2ap/subscription_delete_response.hpp" -#include "../xapp-asn/e2ap/subscription_request.hpp" -#include "../xapp-asn/e2ap/subscription_response.hpp" +#include "subscription_delete_request.hpp" +#include "subscription_delete_response.hpp" +#include "subscription_request.hpp" +#include "subscription_response.hpp" #define SUBSCR_SUCCESS 1 #define SUBSCR_ERR_TX -1 @@ -87,12 +90,10 @@ typedef enum { request_pending = 1, request_success, request_failed, - delete_request_pending, - delete_request_success, - delete_request_failed, request_duplicate }Subscription_Status_Types; + using transaction_identifier = std::string; using transaction_status = Subscription_Status_Types; @@ -100,7 +101,7 @@ class SubscriptionHandler { public: - SubscriptionHandler(unsigned int timeout_seconds = 10); + SubscriptionHandler(unsigned int timeout_seconds = 30); template int manage_subscription_request(transaction_identifier, AppTransmitter &&); @@ -110,13 +111,15 @@ public: void manage_subscription_response(int message_type, transaction_identifier id); - int const get_request_status(transaction_identifier); + int get_request_status(transaction_identifier); bool set_request_status(transaction_identifier, transaction_status); bool is_request_entry(transaction_identifier); void set_timeout(unsigned int); void clear(void); void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;}; + void print_subscription_status(){ for(auto it:status_table){std::cout << it.first << "::" << it.second << std::endl;}}; + private: bool add_request_entry(transaction_identifier, transaction_status); @@ -138,6 +141,7 @@ private: template bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){ + mdclog_write(MDCLOG_INFO,"Entry added for Transaction ID: %s",id.c_str()); // add entry in hash table if it does not exist auto search = trans_table.find(id); @@ -187,70 +191,14 @@ int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_ // record time stamp .. auto start = std::chrono::system_clock::now(); - res = SUBSCR_ERR_UNKNOWN; - - - while(1){ - // release lock and wait to be woken up - _cv.get()->wait_for(_local_lock, _time_out); - - // we have woken and acquired data_lock - // check status and return appropriate object - int status = get_request_status(rmr_trans_id); - - if (status == request_success){ - mdclog_write(MDCLOG_INFO, "Successfully subscribed for request for trans_id %s", rmr_trans_id.c_str()); - res = SUBSCR_SUCCESS; - break; - } - - if (status == request_pending){ - // woken up spuriously or timed out - auto end = std::chrono::system_clock::now(); - std::chrono::duration f = end - start; - - if ( f > _time_out){ - - mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id.c_str()); - - res = SUBSCR_ERR_TIMEOUT; - //sunny side scenario. assuming subscription response is received. - //res = SUBSCR_SUCCESS; - break; - } - else{ - mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response....", rmr_trans_id.c_str()); - continue; - } - - } - - if(status == request_failed){ - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s got failure response .. \n", __FILE__, __LINE__, rmr_trans_id); - res = SUBSCR_ERR_FAIL; - break; - } - - if (status == request_duplicate){ - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id); - res = SUBSCR_ERR_DUPLICATE; - break; - - } + std::chrono::milliseconds t_out(_time_out); - // if we are here, some spurious - // status obtained or request failed . we return appropriate error code - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, and state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status); - res = SUBSCR_ERR_UNKNOWN; - break; - }; + //the wait functionality has been removed. - delete_request_entry(rmr_trans_id); - // release data lock _local_lock.unlock(); - // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl; - return res; + // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl; + return res; }; #endif diff --git a/test/test_subs.h b/test/test_subs.h index 2c33d55..d831c96 100644 --- a/test/test_subs.h +++ b/test/test_subs.h @@ -60,5 +60,37 @@ TEST(SUBSCRIPTION, Request){ } +//create a MOck e2term +TEST (MOCK, E2TERM){ + + const char* meid = "test1"; + const char* sub_id = "sub1"; + //Send the Subscription Response. + xapp_rmr_header hdr; + hdr.message_type = RIC_SUB_RESP; + clock_gettime(CLOCK_REALTIME, &(hdr.ts)); + const char* strMsg = "Subscription Response for MEID: test1"; + hdr.payload_length = strlen(strMsg); + strcpy((char*)hdr.meid, meid); + strcpy((char*)hdr.sid,sub_id); + int total_num_msgs = 2; + int num_attempts = 10; + + std::unique_ptr rmr; + rmr = std::make_unique("4591",num_attempts); + rmr->xapp_rmr_init(true); + + XappSettings config; + + std::unique_ptr hw_xapp = std::make_unique(std::ref(config),std::ref(*rmr)); + while(1){ + bool res_msg = rmr->xapp_rmr_send(&hdr,(void*)strMsg); + if(res_msg){ + mdclog_write(MDCLOG_INFO, "Message Sent Successfully"); + break; + } + sleep(10); + } +} #endif /* TEST_TEST_SUBS_H_ */