2 ==================================================================================
3 Copyright (c) 2019-2020 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
21 * Author: Ashwin Shridharan, Shraboni Jana
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
30 #include <mdclog/mdclog.h>
32 #include <condition_variable>
33 #include <unordered_map>
39 #include <rmr/RIC_message_types.h>
41 #include "subscription_delete_request.hpp"
42 #include "subscription_delete_response.hpp"
43 #include "subscription_request.hpp"
44 #include "subscription_response.hpp"
46 #define SUBSCR_SUCCESS 1
47 #define SUBSCR_ERR_TX -1
48 #define SUBSCR_ERR_TIMEOUT -2
49 #define SUBSCR_ERR_FAIL -3
50 #define SUBSCR_ERR_UNKNOWN -4
51 #define SUBSCR_ERR_DUPLICATE -5
58 virtual ~TransmitterBase() {}
61 const T& getParam() const; //to be implemented after Parameter
63 template<class T, class U>
64 void setParam(const U& rhs); //to be implemented after Parameter
68 class Transmitter : public TransmitterBase
71 Transmitter(const T& tx) :obj(tx) {}
72 const T& getParam() const {return obj;}
73 void setParam(const T& tx) {obj=tx;}
78 //Here's the trick: dynamic_cast rather than virtual
79 template<class T> const T& TransmitterBase::getParam() const
81 return dynamic_cast<const Transmitter<T>&>(*this).getParam();
83 template<class T, class U> void TransmitterBase::setParam(const U& rhs)
85 dynamic_cast<Transmitter<T>&>(*this).setParam(rhs);
94 }Subscription_Status_Types;
97 using transaction_identifier = std::string;
98 using transaction_status = Subscription_Status_Types;
100 class SubscriptionHandler {
104 SubscriptionHandler(unsigned int timeout_seconds = 30);
106 template <typename AppTransmitter>
107 int manage_subscription_request(transaction_identifier, AppTransmitter &&);
109 template <typename AppTransmitter>
110 int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&);
112 void manage_subscription_response(int message_type, transaction_identifier id);
114 int get_request_status(transaction_identifier);
115 bool set_request_status(transaction_identifier, transaction_status);
116 bool is_request_entry(transaction_identifier);
117 void set_timeout(unsigned int);
119 void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
121 void print_subscription_status(){ for(auto it:status_table){std::cout << it.first << "::" << it.second << std::endl;}};
125 bool add_request_entry(transaction_identifier, transaction_status);
126 bool delete_request_entry(transaction_identifier);
128 template <typename AppTransmitter>
129 bool add_transmitter_entry(transaction_identifier, AppTransmitter&&);
131 std::unordered_map<transaction_identifier, TransmitterBase> trans_table;
132 std::unordered_map<transaction_identifier, transaction_status> status_table;
134 std::unique_ptr<std::mutex> _data_lock;
135 std::unique_ptr<std::condition_variable> _cv;
137 std::chrono::seconds _time_out;
139 bool _ignore_subs_resp = false;
142 template <typename AppTransmitter>
143 bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
144 mdclog_write(MDCLOG_INFO,"Entry added for Transaction ID: %s",id.c_str());
146 // add entry in hash table if it does not exist
147 auto search = trans_table.find(id);
148 if(search != trans_table.end()){
152 Transmitter<AppTransmitter> tptr(trans);
153 trans_table[id] = tptr;
158 //this will work for both sending subscription request and subscription delete request.
159 //The handler is oblivious of the message content and follows the transaction id.
160 template<typename AppTransmitter>
161 int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){
163 // put entry in request table
165 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
167 res = add_request_entry(rmr_trans_id, request_pending);
169 mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present", __FILE__, __LINE__, rmr_trans_id);
170 return SUBSCR_ERR_DUPLICATE;
176 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
183 delete_request_entry(rmr_trans_id);
184 mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id.c_str() );
185 return SUBSCR_ERR_TX;
187 mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id.c_str() );
188 add_transmitter_entry(rmr_trans_id, tx);
192 // record time stamp ..
193 auto start = std::chrono::system_clock::now();
194 std::chrono::milliseconds t_out(_time_out);
196 //the wait functionality has been removed.
199 _local_lock.unlock();
200 // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;