X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=Bouncer%2Fsrc%2Fxapp-mgmt%2Fsubs_mgmt.hpp;fp=Bouncer%2Fsrc%2Fxapp-mgmt%2Fsubs_mgmt.hpp;h=c50082780cf3d81caf37aec227afbc8a6bb02aa2;hb=ff20129c8f517cca6e0b4de6544ff64aebe7c171;hp=0000000000000000000000000000000000000000;hpb=35882dccfbc1b35af0e5704e14e0ecb9eba0f52a;p=ric-app%2Fbouncer.git diff --git a/Bouncer/src/xapp-mgmt/subs_mgmt.hpp b/Bouncer/src/xapp-mgmt/subs_mgmt.hpp new file mode 100644 index 0000000..c500827 --- /dev/null +++ b/Bouncer/src/xapp-mgmt/subs_mgmt.hpp @@ -0,0 +1,204 @@ +/* +================================================================================== + Copyright (c) 2019-2020 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ +/* + * subs_mgmt.hpp + * Created on: 2019 + * Author: Ashwin Shridharan, Shraboni Jana + */ + +#pragma once + +#ifndef SUBSCRIPTION_HANDLER +#define SUBSCRIPTION_HANDLER + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "subscription_delete_request.hpp" +#include "subscription_delete_response.hpp" +#include "subscription_request.hpp" +#include "subscription_response.hpp" + +#define SUBSCR_SUCCESS 1 +#define SUBSCR_ERR_TX -1 +#define SUBSCR_ERR_TIMEOUT -2 +#define SUBSCR_ERR_FAIL -3 +#define SUBSCR_ERR_UNKNOWN -4 +#define SUBSCR_ERR_DUPLICATE -5 + +using namespace std; + +class TransmitterBase +{ +public: + virtual ~TransmitterBase() {} + + template + const T& getParam() const; //to be implemented after Parameter + + template + void setParam(const U& rhs); //to be implemented after Parameter +}; + +template +class Transmitter : public TransmitterBase +{ +public: + Transmitter(const T& tx) :obj(tx) {} + const T& getParam() const {return obj;} + void setParam(const T& tx) {obj=tx;} +private: + T obj; +}; + +//Here's the trick: dynamic_cast rather than virtual +template const T& TransmitterBase::getParam() const +{ + return dynamic_cast&>(*this).getParam(); +} +template void TransmitterBase::setParam(const U& rhs) +{ + dynamic_cast&>(*this).setParam(rhs); + return; +} + +typedef enum { + request_pending = 1, + request_success, + request_failed, + request_duplicate +}Subscription_Status_Types; + + +using transaction_identifier = std::string; +using transaction_status = Subscription_Status_Types; + +class SubscriptionHandler { + +public: + + SubscriptionHandler(unsigned int timeout_seconds = 30); + + template + int manage_subscription_request(transaction_identifier, AppTransmitter &&); + + template + int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&); + + void manage_subscription_response(int message_type, transaction_identifier id); + + int get_request_status(transaction_identifier); + bool set_request_status(transaction_identifier, transaction_status); + bool is_request_entry(transaction_identifier); + void set_timeout(unsigned int); + void clear(void); + void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;}; + + void print_subscription_status(){ for(auto it:status_table){std::cout << it.first << "::" << it.second << std::endl;}}; + +private: + + bool add_request_entry(transaction_identifier, transaction_status); + bool delete_request_entry(transaction_identifier); + + template + bool add_transmitter_entry(transaction_identifier, AppTransmitter&&); + + std::unordered_map trans_table; + std::unordered_map status_table; + + std::unique_ptr _data_lock; + std::unique_ptr _cv; + + std::chrono::seconds _time_out; + + bool _ignore_subs_resp = false; +}; + +template +bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){ + mdclog_write(MDCLOG_INFO,"Entry added for Transaction ID: %s",id.c_str()); + + // add entry in hash table if it does not exist + auto search = trans_table.find(id); + if(search != trans_table.end()){ + return false; + } + + Transmitter tptr(trans); + trans_table[id] = tptr; + return true; + +}; + +//this will work for both sending subscription request and subscription delete request. +//The handler is oblivious of the message content and follows the transaction id. +template +int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){ + int res; + // put entry in request table + { + std::lock_guard lock(*(_data_lock.get())); + + res = add_request_entry(rmr_trans_id, request_pending); + if(! res){ + mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present", __FILE__, __LINE__, rmr_trans_id); + return SUBSCR_ERR_DUPLICATE; + } + } + + + // acquire lock ... + std::unique_lock _local_lock(*(_data_lock.get())); + + // Send the message + bool flg = tx(); + + if (!flg){ + // clear state + delete_request_entry(rmr_trans_id); + mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id.c_str() ); + return SUBSCR_ERR_TX; + } else { + mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id.c_str() ); + add_transmitter_entry(rmr_trans_id, tx); + + } + + // record time stamp .. + auto start = std::chrono::system_clock::now(); + std::chrono::milliseconds t_out(_time_out); + + //the wait functionality has been removed. + + + _local_lock.unlock(); + // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl; + return res; +}; + +#endif