2 ==================================================================================
3 Copyright (c) 2019-2020 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
21 * Author: Ashwin Shridharan, Shraboni Jana
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
30 #include <mdclog/mdclog.h>
32 #include <condition_variable>
33 #include <unordered_map>
37 #include "../xapp-asn/e2ap/subscription_delete_request.hpp"
38 #include "../xapp-asn/e2ap/subscription_delete_response.hpp"
39 #include "../xapp-asn/e2ap/subscription_request.hpp"
40 #include "../xapp-asn/e2ap/subscription_response.hpp"
42 #define SUBSCR_SUCCESS 1
43 #define SUBSCR_ERR_TX -1
44 #define SUBSCR_ERR_TIMEOUT -2
45 #define SUBSCR_ERR_FAIL -3
46 #define SUBSCR_ERR_UNKNOWN -4
47 #define SUBSCR_ERR_DUPLICATE -5
54 virtual ~TransmitterBase() {}
57 const T& getParam() const; //to be implemented after Parameter
59 template<class T, class U>
60 void setParam(const U& rhs); //to be implemented after Parameter
64 class Transmitter : public TransmitterBase
67 Transmitter(const T& tx) :obj(tx) {}
68 const T& getParam() const {return obj;}
69 void setParam(const T& tx) {obj=tx;}
74 //Here's the trick: dynamic_cast rather than virtual
75 template<class T> const T& TransmitterBase::getParam() const
77 return dynamic_cast<const Transmitter<T>&>(*this).getParam();
79 template<class T, class U> void TransmitterBase::setParam(const U& rhs)
81 dynamic_cast<Transmitter<T>&>(*this).setParam(rhs);
89 delete_request_pending,
90 delete_request_success,
91 delete_request_failed,
93 }Subscription_Status_Types;
95 using transaction_identifier = unsigned char*;
96 using transaction_status = Subscription_Status_Types;
98 class SubscriptionHandler {
102 SubscriptionHandler(unsigned int timeout_seconds = 10);
104 template <typename AppTransmitter>
105 int manage_subscription_request(transaction_identifier, AppTransmitter &&);
107 template <typename AppTransmitter>
108 int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&);
110 void manage_subscription_response(int message_type, transaction_identifier id);
112 int const get_request_status(transaction_identifier);
113 bool set_request_status(transaction_identifier, transaction_status);
114 bool is_request_entry(transaction_identifier);
115 void set_timeout(unsigned int);
117 void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
121 bool add_request_entry(transaction_identifier, transaction_status);
122 bool delete_request_entry(transaction_identifier);
124 template <typename AppTransmitter>
125 bool add_transmitter_entry(transaction_identifier, AppTransmitter&&);
127 std::unordered_map<transaction_identifier, TransmitterBase> trans_table;
128 std::unordered_map<transaction_identifier, transaction_status> status_table;
130 std::unique_ptr<std::mutex> _data_lock;
131 std::unique_ptr<std::condition_variable> _cv;
133 std::chrono::seconds _time_out;
135 bool _ignore_subs_resp = false;
138 template <typename AppTransmitter>
139 bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
141 // add entry in hash table if it does not exist
142 auto search = trans_table.find(id);
143 if(search != trans_table.end()){
147 Transmitter<AppTransmitter> tptr(trans);
148 trans_table[id] = tptr;
153 //this will work for both sending subscription request and subscription delete request.
154 //The handler is oblivious of the message content and follows the transaction id.
155 template<typename AppTransmitter>
156 int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){
158 // put entry in request table
160 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
162 res = add_request_entry(rmr_trans_id, request_pending);
164 mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present", __FILE__, __LINE__, rmr_trans_id);
165 return SUBSCR_ERR_DUPLICATE;
171 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
178 delete_request_entry(rmr_trans_id);
179 mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id );
180 return SUBSCR_ERR_TX;
182 mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id );
183 add_transmitter_entry(rmr_trans_id, tx);
187 // record time stamp ..
188 auto start = std::chrono::system_clock::now();
189 res = SUBSCR_ERR_UNKNOWN;
193 // release lock and wait to be woken up
194 _cv.get()->wait_for(_local_lock, _time_out);
196 // we have woken and acquired data_lock
197 // check status and return appropriate object
198 int status = get_request_status(rmr_trans_id);
200 if (status == request_success){
201 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request for trans_id %s", rmr_trans_id);
202 res = SUBSCR_SUCCESS;
206 if (status == request_pending){
207 // woken up spuriously or timed out
208 auto end = std::chrono::system_clock::now();
209 std::chrono::duration<double> f = end - start;
213 mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id);
215 //res = SUBSCR_ERR_TIMEOUT;
216 //sunny side scenario. assuming subscription response is received.
217 res = SUBSCR_SUCCESS;
221 mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response....", rmr_trans_id);
227 if(status == request_failed){
228 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
229 res = SUBSCR_ERR_FAIL;
233 if (status == request_duplicate){
234 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
235 res = SUBSCR_ERR_DUPLICATE;
240 // if we are here, some spurious
241 // status obtained or request failed . we return appropriate error code
242 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, and state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
243 res = SUBSCR_ERR_UNKNOWN;
247 delete_request_entry(rmr_trans_id);
250 _local_lock.unlock();
251 // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;