2 ==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
19 /* Author : Ashwin Sridharan
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
30 #include <mdclog/mdclog.h>
32 #include <condition_variable>
33 #include <unordered_map>
37 #include <subscription_request.hpp>
38 #include <subscription_response.hpp>
39 #include <subscription_delete_request.hpp>
40 #include <subscription_delete_response.hpp>
42 #define SUBSCR_SUCCESS 0
43 #define SUBSCR_ERR_TX 1
44 #define SUBSCR_ERR_TIMEOUT 2
45 #define SUBSCR_ERR_FAIL 3
46 #define SUBSCR_ERR_UNKNOWN 4
47 #define SUBSCR_ERR_DUPLICATE 5
48 #define SUBSCR_ERR_ENCODE 6
49 #define SUBSCR_ERR_MISSING 7
57 delete_request_pending,
58 delete_request_success,
59 delete_request_failed,
61 }Subscription_Status_Types;
63 using subscription_identifier = std::tuple<std::string , int>;
65 /* Class to process subscription related messages
66 The class provides mechanism to send and process
67 subscriptions and subscription deletes.
69 NOTE 1: It is currently unclear how an xAPP should identify a
70 subscription request/response pair uniquely in the absence/presence of
71 subscription manager. Ideally, the subscription manager should be
72 transperent to the xAPP but that may not be the case, i.e the
73 subscription manager may the subscription request id fields.
74 The xAPP needs to identify uniquely not just the subscription response, but
75 also when it needs to send a delete for the corresponding request.
76 From that perspective, the fields present in both the subscription response and
77 a delete request are the RICrequestId and RANfunctionId. However, the subscription manager
78 may require that the RICrequestId fields be set to a specific value (TBD). Hence
79 for current purposes, a RIC subscription request is uniquely identified by the
80 tuple <gNodeB-ID, RANfunctionId>. This is not ideal, since potentially the same RANfunctionID
81 may be subscribed to in different modes, but for now this is the constraint.
84 NOTE 2: There is discussion on tracking subscription request/repsonse using the RMR transaction ID.
85 However, a conscious choice made with the subscription_handler is that it be agnostic to the transmission
86 medium(RMR) for purposes of design isolation. Consequently, the subscription handler is not aware of any RMR
87 related semantics, but simply accepts a function to send the request/delete request that accepts a signature
88 Type, Length, Value . This also means in its current design, we cannot use transaction id to track request/response.
91 NOTE 3: The subscription handler is thread-safe, i.e multiple elements
92 can request subscriptions/subscription deletes from multiple threads. However
93 this does not preclude conflict if multiple threads are trying to make
94 subscriptions based on the same triplet (in which cases, results will be internally
95 consistent, but may yield errors to calling agent).
99 struct subscription_hasher {
100 size_t operator()(const subscription_identifier & key) const {
101 return std::hash<std::string>{}(std::get<0>(key) + std::to_string(std::get<1>(key)));
105 class subscription_handler {
109 subscription_handler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
113 template <typename Transmitter>
114 int request_subscription(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&);
116 template<typename Transmitter>
117 int request_subscription_delete(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&);
119 void Response(int, unsigned char *, int, const char *);
120 int const get_request_status(subscription_identifier);
121 subscription_response_helper * const get_subscription(subscription_identifier);
123 unsigned int get_next_id(void);
124 void set_timeout(unsigned int);
125 void set_num_retries(unsigned int);
127 bool is_subscription_entry(subscription_identifier);
128 bool is_request_entry(subscription_identifier);
129 void get_subscription_keys(std::vector<subscription_identifier> &);
131 size_t num_pending(void) const;
132 size_t num_complete(void) const ;
139 bool add_request_entry(subscription_identifier, int);
140 bool set_request_status(subscription_identifier, int);
141 bool delete_request_entry(subscription_identifier);
143 bool get_subscription_entry(subscription_identifier);
144 bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
145 bool delete_subscription_entry(subscription_identifier);
147 std::unordered_map<subscription_identifier, int, subscription_hasher> requests_table;
148 std::unordered_map<subscription_identifier, subscription_response_helper, subscription_hasher> subscription_responses; // stores list of successful subscriptions
150 std::unique_ptr<std::mutex> _data_lock;
151 std::unique_ptr<std::condition_variable> _cv;
153 std::chrono::seconds _time_out;
154 unsigned int _num_retries = 2;
155 unsigned int unique_request_id = 0;
159 template <typename Transmitter>
160 int subscription_handler::request_subscription(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){
163 unsigned char buffer[512];
164 size_t buf_len = 512;
166 // As per current design, request id and request sequence number
167 // must be set to zero ...
168 he.set_request(0, 0);
169 subscription_request e2ap_sub_req;
171 // generate subscription identifier
172 subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
174 // generate the request pdu
175 res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he);
177 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
178 return SUBSCR_ERR_ENCODE;
181 // put entry in request table
183 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
184 res = add_request_entry(sub_id, request_pending);
186 mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
187 return SUBSCR_ERR_DUPLICATE;
192 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
196 res = tx(TxCode, buf_len, buffer);
199 delete_request_entry(sub_id);
200 mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
201 return SUBSCR_ERR_TX;
205 // record time stamp ..
206 auto start = std::chrono::system_clock::now();
207 res = SUBSCR_ERR_UNKNOWN;
210 // release lock and wait to be woken up
211 _cv.get()->wait_for(_local_lock, _time_out);
213 // we have woken and acquired data_lock
214 // check status and return appropriate object
216 int status = get_request_status(sub_id);
218 if (status == request_success){
219 // retreive & store the subscription response
220 response = subscription_responses[sub_id];
221 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
222 res = SUBSCR_SUCCESS;
226 if (status == request_pending){
227 // woken up spuriously or timed out
228 auto end = std::chrono::system_clock::now();
229 std::chrono::duration<double> f = end - start;
231 if ( f > _num_retries * _time_out){
232 mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
233 res = SUBSCR_ERR_TIMEOUT;
237 mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
242 if(status == request_failed){
243 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
244 res = SUBSCR_ERR_FAIL;
248 if (status == request_duplicate){
249 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
250 res = SUBSCR_ERR_DUPLICATE;
255 // if we are here, some spurious
256 // status obtained or request failed . we return appropriate error code
257 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
258 res = SUBSCR_ERR_UNKNOWN;
262 delete_request_entry(sub_id);
265 _local_lock.unlock();
266 std::cout <<"Returning res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," << std::get<1>(sub_id) << std::endl;
271 template <typename Transmitter>
272 int subscription_handler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){
275 // generate subscription identifier
276 subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
278 // First check if we have this subscription
279 if(! is_subscription_entry(sub_id)){
280 mdclog_write(MDCLOG_ERR, "subscription with id %s, %d does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
281 return SUBSCR_ERR_MISSING;
284 // Also check if such a request is queued
285 if (is_request_entry(sub_id)){
286 mdclog_write(MDCLOG_ERR, "Subscription delete request with id %s, %d already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
287 return SUBSCR_ERR_DUPLICATE;
290 subscription_delete e2ap_sub_req_del;
292 // generate the delete request pdu
293 unsigned char buffer[128];
294 size_t buf_len = 128;
296 res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
298 mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
299 return SUBSCR_ERR_ENCODE;
302 // put entry in request table
304 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
305 res = add_request_entry(sub_id, delete_request_pending);
307 mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
308 return SUBSCR_ERR_DUPLICATE;
312 std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
315 res = tx(TxCode, buf_len, buffer);
318 delete_request_entry(sub_id);
319 mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
320 return SUBSCR_ERR_TX;
324 // record time stamp ..
325 auto start = std::chrono::system_clock::now();
327 res = SUBSCR_ERR_UNKNOWN;
330 // wait to be woken up
331 _cv.get()->wait_for(_local_lock, _time_out);
333 // check status and return appropriate object
334 int status = get_request_status(sub_id);
335 if (status == delete_request_success){
336 mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
337 res = SUBSCR_SUCCESS;
341 if (status == delete_request_pending){
342 // woken up spuriously or timed out
343 auto end = std::chrono::system_clock::now();
344 std::chrono::duration<double> f = end - start;
346 if (f > _num_retries * _time_out){
347 mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
348 res = SUBSCR_ERR_TIMEOUT;
352 mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
358 if(status == delete_request_failed){
359 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
360 res = SUBSCR_ERR_FAIL;
364 // if we are here, some spurious
365 // status obtained. we return false
367 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
368 res = SUBSCR_ERR_UNKNOWN;
373 delete_request_entry(sub_id);
376 _local_lock.unlock();
377 std::cout <<"Returning res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl;