//Create Subscription Handler if Xapp deals with Subscription.
- std::unique_ptr<SubscriptionHandler> sub_handler = std::make_unique<SubscriptionHandler>();
+ //std::unique_ptr<SubscriptionHandler> sub_handler = std::make_unique<SubscriptionHandler>();
+
+ SubscriptionHandler sub_handler;
//create HelloWorld Xapp Instance.
std::unique_ptr<Xapp> hw_xapp;
hw_xapp = std::make_unique<Xapp>(std::ref(config),std::ref(*rmr));
mdclog_write(MDCLOG_INFO, "Created Hello World Xapp Instance");
-
- sleep(1);
//Startup E2 subscription and A1 policy
- hw_xapp->startup(std::ref(*sub_handler));
+ hw_xapp->startup(sub_handler);
+
+ sleep(10);
//start listener threads and register message handlers.
int num_threads = std::stoi(config[XappSettings::SettingName::THREADS]);
mdclog_write(MDCLOG_INFO, "Starting Listener Threads. Number of Workers = %d", num_threads);
- std::unique_ptr<XappMsgHandler> mp_handler = std::make_unique<XappMsgHandler>(config[XappSettings::SettingName::XAPP_ID]);
+ std::unique_ptr<XappMsgHandler> mp_handler = std::make_unique<XappMsgHandler>(config[XappSettings::SettingName::XAPP_ID], sub_handler);
+
hw_xapp->start_xapp_receiver(std::ref(*mp_handler));
sleep(1);
- //xapp->shutdown();
+
+
+ //hw_xapp->Run() //for spinning multiple receiving threads.
+
+ //hw_xapp->shutdown();
while(1){
sleep(1);
* Author: Ashwin Shridharan, Shraboni Jana
*/
#include "subs_mgmt.hpp"
-
+#include <thread>
#include <errno.h>
-
SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds):_time_out(std::chrono::seconds(timeout_seconds)){
_data_lock = std::make_unique<std::mutex>();
_cv = std::make_unique<std::condition_variable>();
if (search != status_table.end()){
status_table.erase(search);
- mdclog_write(MDCLOG_INFO,"Entry for Transaction ID deleted: %s",id.c_str());
+ mdclog_write(MDCLOG_INFO,"Entry for Transaction ID deleted: %d",id);
return true;
}
- mdclog_write(MDCLOG_INFO,"Entry not found in SubscriptionHandler for Transaction ID: %s",id.c_str());
+ mdclog_write(MDCLOG_INFO,"Entry not found in SubscriptionHandler for Transaction ID: %d",id);
return false;
};
bool SubscriptionHandler::set_request_status(transaction_identifier id, transaction_status status){
// change status of a request only if it exists.
- auto search = status_table.find(id);
- if(search != status_table.end()){
- status_table[id] = status;
- return true;
- }
-
+ for(auto &it:status_table){
+ if(strcmp(it.first.c_str(), id.c_str())==0) {
+ it.second = status;
+ return true;
+ }
+ }
return false;
-
+
};
-int const SubscriptionHandler::get_request_status(transaction_identifier id){
- auto search = status_table.find(id);
- if (search == status_table.end()){
- return -1;
- }
-
- return search->second;
+int SubscriptionHandler::get_request_status(transaction_identifier id){
+
+ for(auto it:status_table){
+ if(strcmp(it.first.c_str(), id.c_str())==0) {
+ return it.second;
+ }
+ }
+
+
+ return -1;
}
bool SubscriptionHandler::is_request_entry(transaction_identifier id){
- auto search = status_table.find(id);
- if (search != status_table.end())
- return true;
- else
+ for(auto it:status_table){
+ if(strcmp(it.first.c_str(), id.c_str())==0) {
+ return true;
+ }
+ }
return false;
}
+
+
+
// Handles subscription responses
void SubscriptionHandler::manage_subscription_response(int message_type, transaction_identifier id){
+ // Make This Thread sleep for 1 Second
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ {
+ std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
+ mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for me id %s WAS: %d",id.c_str(),this->get_request_status(id));
- bool res;
- std::cout << "In Manage subscription" << std::endl;
+ //from the message type we can know if its a success/failure etc.
+ if(message_type==RIC_SUB_RESP)
+ this->set_request_status(id, request_success);
- // wake up all waiting users ...
- if(is_request_entry(id)){
- std::cout << "In Manage subscription" << std::endl;
- set_request_status(id, request_success);
- _cv.get()->notify_all();
- }
+ if(message_type==RIC_SUB_FAILURE)
+ this->set_request_status(id,request_failed);
+
+ mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for me id %s IS: %d",id.c_str(),this->get_request_status(id));
+
+ //this->print_subscription_status();
+ }
+ //_cv.get()->notify_all();
}
#include <mutex>
#include <condition_variable>
#include <unordered_map>
+#include <algorithm>
+#include <ctime>
+#include <unistd.h>
#include <chrono>
#include <tuple>
-#include <string>
+#include <rmr/RIC_message_types.h>
-#include "../xapp-asn/e2ap/subscription_delete_request.hpp"
-#include "../xapp-asn/e2ap/subscription_delete_response.hpp"
-#include "../xapp-asn/e2ap/subscription_request.hpp"
-#include "../xapp-asn/e2ap/subscription_response.hpp"
+#include "subscription_delete_request.hpp"
+#include "subscription_delete_response.hpp"
+#include "subscription_request.hpp"
+#include "subscription_response.hpp"
#define SUBSCR_SUCCESS 1
#define SUBSCR_ERR_TX -1
request_pending = 1,
request_success,
request_failed,
- delete_request_pending,
- delete_request_success,
- delete_request_failed,
request_duplicate
}Subscription_Status_Types;
+
using transaction_identifier = std::string;
using transaction_status = Subscription_Status_Types;
public:
- SubscriptionHandler(unsigned int timeout_seconds = 10);
+ SubscriptionHandler(unsigned int timeout_seconds = 30);
template <typename AppTransmitter>
int manage_subscription_request(transaction_identifier, AppTransmitter &&);
void manage_subscription_response(int message_type, transaction_identifier id);
- int const get_request_status(transaction_identifier);
+ int get_request_status(transaction_identifier);
bool set_request_status(transaction_identifier, transaction_status);
bool is_request_entry(transaction_identifier);
void set_timeout(unsigned int);
void clear(void);
void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
+ void print_subscription_status(){ for(auto it:status_table){std::cout << it.first << "::" << it.second << std::endl;}};
+
private:
bool add_request_entry(transaction_identifier, transaction_status);
template <typename AppTransmitter>
bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
+ mdclog_write(MDCLOG_INFO,"Entry added for Transaction ID: %s",id.c_str());
// add entry in hash table if it does not exist
auto search = trans_table.find(id);
// record time stamp ..
auto start = std::chrono::system_clock::now();
- res = SUBSCR_ERR_UNKNOWN;
-
-
- while(1){
- // release lock and wait to be woken up
- _cv.get()->wait_for(_local_lock, _time_out);
-
- // we have woken and acquired data_lock
- // check status and return appropriate object
- int status = get_request_status(rmr_trans_id);
-
- if (status == request_success){
- mdclog_write(MDCLOG_INFO, "Successfully subscribed for request for trans_id %s", rmr_trans_id.c_str());
- res = SUBSCR_SUCCESS;
- break;
- }
-
- if (status == request_pending){
- // woken up spuriously or timed out
- auto end = std::chrono::system_clock::now();
- std::chrono::duration<double> f = end - start;
-
- if ( f > _time_out){
-
- mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id.c_str());
-
- res = SUBSCR_ERR_TIMEOUT;
- //sunny side scenario. assuming subscription response is received.
- //res = SUBSCR_SUCCESS;
- break;
- }
- else{
- mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response....", rmr_trans_id.c_str());
- continue;
- }
-
- }
-
- if(status == request_failed){
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
- res = SUBSCR_ERR_FAIL;
- break;
- }
-
- if (status == request_duplicate){
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
- res = SUBSCR_ERR_DUPLICATE;
- break;
-
- }
+ std::chrono::milliseconds t_out(_time_out);
- // if we are here, some spurious
- // status obtained or request failed . we return appropriate error code
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, and state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
- res = SUBSCR_ERR_UNKNOWN;
- break;
- };
+ //the wait functionality has been removed.
- delete_request_entry(rmr_trans_id);
- // release data lock
_local_lock.unlock();
- // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;
- return res;
+ // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;
+ return res;
};
#endif
}
+//create a MOck e2term
+TEST (MOCK, E2TERM){
+
+ const char* meid = "test1";
+ const char* sub_id = "sub1";
+ //Send the Subscription Response.
+ xapp_rmr_header hdr;
+ hdr.message_type = RIC_SUB_RESP;
+ clock_gettime(CLOCK_REALTIME, &(hdr.ts));
+ const char* strMsg = "Subscription Response for MEID: test1";
+ hdr.payload_length = strlen(strMsg);
+ strcpy((char*)hdr.meid, meid);
+ strcpy((char*)hdr.sid,sub_id);
+ int total_num_msgs = 2;
+ int num_attempts = 10;
+
+ std::unique_ptr<XappRmr> rmr;
+ rmr = std::make_unique<XappRmr>("4591",num_attempts);
+ rmr->xapp_rmr_init(true);
+
+ XappSettings config;
+
+ std::unique_ptr<Xapp> hw_xapp = std::make_unique<Xapp>(std::ref(config),std::ref(*rmr));
+ while(1){
+ bool res_msg = rmr->xapp_rmr_send(&hdr,(void*)strMsg);
+ if(res_msg){
+ mdclog_write(MDCLOG_INFO, "Message Sent Successfully");
+ break;
+ }
+ sleep(10);
+ }
+}
#endif /* TEST_TEST_SUBS_H_ */