2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
21 * Author: Ashwin Shridharan, Shraboni Jana
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
30 #include <mdclog/mdclog.h>
32 #include <condition_variable>
33 #include <unordered_map>
37 #include "../xapp-formats/e2ap/subscription_delete_request.hpp"
38 #include "../xapp-formats/e2ap/subscription_delete_response.hpp"
39 #include "../xapp-formats/e2ap/subscription_request.hpp"
40 #include "../xapp-formats/e2ap/subscription_response.hpp"
42 #define SUBSCR_SUCCESS 0
43 #define SUBSCR_ERR_TX 1
44 #define SUBSCR_ERR_TIMEOUT 2
45 #define SUBSCR_ERR_FAIL 3
46 #define SUBSCR_ERR_UNKNOWN 4
47 #define SUBSCR_ERR_DUPLICATE 5
48 #define SUBSCR_ERR_ENCODE 6
49 #define SUBSCR_ERR_MISSING 7
57 delete_request_pending,
58 delete_request_success,
59 delete_request_failed,
61 }Subscription_Status_Types;
63 using subscription_identifier = std::string; //here subscription_identifier is the rmr transaction id.
65 class SubscriptionHandler {
69 SubscriptionHandler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
73 template <typename Transmitter>
74 int request_subscription(std::string, Transmitter &&);
76 template<typename Transmitter>
77 int request_subscription_delete(std::string, int , Transmitter &&);
79 void Response(int, unsigned char *, int, const char *);
80 int const get_request_status(subscription_identifier);
81 subscription_response_helper * const get_subscription(subscription_identifier);
83 unsigned int get_next_id(void);
84 void set_timeout(unsigned int);
85 void set_num_retries(unsigned int);
87 bool is_subscription_entry(subscription_identifier);
88 bool is_request_entry(subscription_identifier);
89 void get_subscription_keys(std::vector<subscription_identifier> &);
91 size_t num_pending(void) const;
92 size_t num_complete(void) const ;
99 bool add_request_entry(subscription_identifier, int);
100 bool set_request_status(subscription_identifier, int);
101 bool delete_request_entry(subscription_identifier);
103 bool get_subscription_entry(subscription_identifier);
104 bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
105 bool delete_subscription_entry(subscription_identifier);
107 std::unordered_map<subscription_identifier, int> requests_table;
109 std::unique_ptr<std::mutex> _data_lock;
110 std::unique_ptr<std::condition_variable> _cv;
112 std::chrono::seconds _time_out;
113 unsigned int _num_retries = 2;
114 unsigned int unique_request_id = 0;
118 template <typename Transmitter>
119 int SubscriptionHandler::request_subscription(std::string rmr_trans_id, Transmitter && tx){
122 // put entry in request table
124 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
125 res = add_request_entry(rmr_trans_id, request_pending);
127 mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present", __FILE__, __LINE__, rmr_trans_id);
128 return SUBSCR_ERR_DUPLICATE;
133 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
140 delete_request_entry(rmr_trans_id);
141 mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id );
142 return SUBSCR_ERR_TX;
146 // record time stamp ..
147 auto start = std::chrono::system_clock::now();
148 res = SUBSCR_ERR_UNKNOWN;
151 // release lock and wait to be woken up
152 _cv.get()->wait_for(_local_lock, _time_out);
154 // we have woken and acquired data_lock
155 // check status and return appropriate object
157 int status = get_request_status(rmr_trans_id);
159 if (status == request_success){
160 // retreive & store the subscription response (why?)
161 // response = subscription_responses[sub_id];
162 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", rmr_trans_id);
163 res = SUBSCR_SUCCESS;
167 if (status == request_pending){
168 // woken up spuriously or timed out
169 auto end = std::chrono::system_clock::now();
170 std::chrono::duration<double> f = end - start;
172 if ( f > _num_retries * _time_out){
173 mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id);
174 res = SUBSCR_ERR_TIMEOUT;
178 mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response ....", rmr_trans_id);
183 if(status == request_failed){
184 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
185 res = SUBSCR_ERR_FAIL;
189 if (status == request_duplicate){
190 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
191 res = SUBSCR_ERR_DUPLICATE;
196 // if we are here, some spurious
197 // status obtained or request failed . we return appropriate error code
198 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
199 res = SUBSCR_ERR_UNKNOWN;
203 delete_request_entry(rmr_trans_id);
206 _local_lock.unlock();
207 std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;