2 ==================================================================================
3 Copyright (c) 2019-2020 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
21 * Author: Ashwin Shridharan, Shraboni Jana
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
30 #include <mdclog/mdclog.h>
32 #include <condition_variable>
33 #include <unordered_map>
38 #include "../xapp-asn/e2ap/subscription_delete_request.hpp"
39 #include "../xapp-asn/e2ap/subscription_delete_response.hpp"
40 #include "../xapp-asn/e2ap/subscription_request.hpp"
41 #include "../xapp-asn/e2ap/subscription_response.hpp"
43 #define SUBSCR_SUCCESS 1
44 #define SUBSCR_ERR_TX -1
45 #define SUBSCR_ERR_TIMEOUT -2
46 #define SUBSCR_ERR_FAIL -3
47 #define SUBSCR_ERR_UNKNOWN -4
48 #define SUBSCR_ERR_DUPLICATE -5
55 virtual ~TransmitterBase() {}
58 const T& getParam() const; //to be implemented after Parameter
60 template<class T, class U>
61 void setParam(const U& rhs); //to be implemented after Parameter
65 class Transmitter : public TransmitterBase
68 Transmitter(const T& tx) :obj(tx) {}
69 const T& getParam() const {return obj;}
70 void setParam(const T& tx) {obj=tx;}
75 //Here's the trick: dynamic_cast rather than virtual
76 template<class T> const T& TransmitterBase::getParam() const
78 return dynamic_cast<const Transmitter<T>&>(*this).getParam();
80 template<class T, class U> void TransmitterBase::setParam(const U& rhs)
82 dynamic_cast<Transmitter<T>&>(*this).setParam(rhs);
90 delete_request_pending,
91 delete_request_success,
92 delete_request_failed,
94 }Subscription_Status_Types;
96 using transaction_identifier = std::string;
97 using transaction_status = Subscription_Status_Types;
99 class SubscriptionHandler {
103 SubscriptionHandler(unsigned int timeout_seconds = 10);
105 template <typename AppTransmitter>
106 int manage_subscription_request(transaction_identifier, AppTransmitter &&);
108 template <typename AppTransmitter>
109 int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&);
111 void manage_subscription_response(int message_type, transaction_identifier id);
113 int const get_request_status(transaction_identifier);
114 bool set_request_status(transaction_identifier, transaction_status);
115 bool is_request_entry(transaction_identifier);
116 void set_timeout(unsigned int);
118 void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
122 bool add_request_entry(transaction_identifier, transaction_status);
123 bool delete_request_entry(transaction_identifier);
125 template <typename AppTransmitter>
126 bool add_transmitter_entry(transaction_identifier, AppTransmitter&&);
128 std::unordered_map<transaction_identifier, TransmitterBase> trans_table;
129 std::unordered_map<transaction_identifier, transaction_status> status_table;
131 std::unique_ptr<std::mutex> _data_lock;
132 std::unique_ptr<std::condition_variable> _cv;
134 std::chrono::seconds _time_out;
136 bool _ignore_subs_resp = false;
139 template <typename AppTransmitter>
140 bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
142 // add entry in hash table if it does not exist
143 auto search = trans_table.find(id);
144 if(search != trans_table.end()){
148 Transmitter<AppTransmitter> tptr(trans);
149 trans_table[id] = tptr;
154 //this will work for both sending subscription request and subscription delete request.
155 //The handler is oblivious of the message content and follows the transaction id.
156 template<typename AppTransmitter>
157 int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){
159 // put entry in request table
161 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
163 res = add_request_entry(rmr_trans_id, request_pending);
165 mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present", __FILE__, __LINE__, rmr_trans_id);
166 return SUBSCR_ERR_DUPLICATE;
172 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
179 delete_request_entry(rmr_trans_id);
180 mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id.c_str() );
181 return SUBSCR_ERR_TX;
183 mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id.c_str() );
184 add_transmitter_entry(rmr_trans_id, tx);
188 // record time stamp ..
189 auto start = std::chrono::system_clock::now();
190 res = SUBSCR_ERR_UNKNOWN;
194 // release lock and wait to be woken up
195 _cv.get()->wait_for(_local_lock, _time_out);
197 // we have woken and acquired data_lock
198 // check status and return appropriate object
199 int status = get_request_status(rmr_trans_id);
201 if (status == request_success){
202 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request for trans_id %s", rmr_trans_id.c_str());
203 res = SUBSCR_SUCCESS;
207 if (status == request_pending){
208 // woken up spuriously or timed out
209 auto end = std::chrono::system_clock::now();
210 std::chrono::duration<double> f = end - start;
214 mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id.c_str());
216 res = SUBSCR_ERR_TIMEOUT;
217 //sunny side scenario. assuming subscription response is received.
218 //res = SUBSCR_SUCCESS;
222 mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response....", rmr_trans_id.c_str());
228 if(status == request_failed){
229 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
230 res = SUBSCR_ERR_FAIL;
234 if (status == request_duplicate){
235 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
236 res = SUBSCR_ERR_DUPLICATE;
241 // if we are here, some spurious
242 // status obtained or request failed . we return appropriate error code
243 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, and state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
244 res = SUBSCR_ERR_UNKNOWN;
248 delete_request_entry(rmr_trans_id);
251 _local_lock.unlock();
252 // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;