User story RICPLT-2620
[ric-app/admin.git] / src / E2AP-c / subscription / subscription_handler.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <functional>
30 #include <mdclog/mdclog.h>
31 #include <mutex>
32 #include <condition_variable>
33 #include <unordered_map>
34 #include <chrono>
35 #include <tuple>
36
37 #include <subscription_request.hpp>
38 #include <subscription_response.hpp>
39 #include <subscription_delete_request.hpp>
40 #include <subscription_delete_response.hpp>
41
42 #define SUBSCR_SUCCESS 0
43 #define SUBSCR_ERR_TX 1
44 #define SUBSCR_ERR_TIMEOUT 2
45 #define SUBSCR_ERR_FAIL 3
46 #define SUBSCR_ERR_UNKNOWN 4
47 #define SUBSCR_ERR_DUPLICATE 5
48 #define SUBSCR_ERR_ENCODE  6
49 #define SUBSCR_ERR_MISSING 7
50
51 using namespace std;
52
53 typedef enum {
54     request_pending = 1,
55     request_success,
56     request_failed,
57     delete_request_pending,
58     delete_request_success,
59     delete_request_failed,
60     request_duplicate
61 }Subscription_Status_Types;
62
63 using subscription_identifier = std::tuple<std::string , int>;
64
65 /* Class to process subscription related messages 
66    The class provides mechanism to send and process
67    subscriptions and subscription deletes. 
68
69    NOTE 1: It is currently unclear how an xAPP should identify a
70    subscription request/response pair uniquely in the absence/presence of
71    subscription manager.  Ideally, the subscription manager should be
72    transperent to the xAPP but that may not be the case, i.e the
73    subscription manager may the subscription request id fields. 
74    The xAPP needs to identify uniquely not just the subscription response, but
75    also when it needs to send a delete for the corresponding request.
76    From that perspective, the fields present in both the subscription response and 
77    a delete request are the RICrequestId and RANfunctionId. However, the subscription manager
78    may require that the RICrequestId fields be set to a specific value (TBD).  Hence
79    for current purposes, a RIC subscription request is uniquely identified by the
80    tuple <gNodeB-ID, RANfunctionId>.  This is not ideal, since potentially the same RANfunctionID
81    may be subscribed to in different modes, but for now this is the constraint.
82
83
84    NOTE 2: There is discussion on tracking subscription request/repsonse using the RMR transaction ID.
85    However, a conscious choice made with the subscription_handler is that it be agnostic to the transmission
86    medium(RMR) for purposes of design isolation. Consequently, the subscription handler is not aware of any RMR
87    related semantics, but simply accepts a function to send the request/delete request that accepts a signature
88    Type, Length, Value . This also means in its current design, we cannot use transaction id to track request/response. 
89   
90
91    NOTE 3: The subscription handler is thread-safe, i.e multiple elements
92    can request subscriptions/subscription deletes from multiple threads. However
93    this does not preclude conflict if multiple threads are trying to make
94    subscriptions based on the same triplet (in which cases, results will be internally
95    consistent, but may yield errors to calling agent).
96
97 */
98
99 struct subscription_hasher {
100   size_t operator()(const subscription_identifier & key) const {
101     return  std::hash<std::string>{}(std::get<0>(key) + std::to_string(std::get<1>(key)));
102   }
103 };
104
105 class subscription_handler {
106                             
107 public:
108
109   subscription_handler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
110   
111   void init(void);
112
113   template <typename Transmitter>
114   int request_subscription(subscription_helper &,  subscription_response_helper &, std::string, int ,   Transmitter &&);
115
116   template<typename Transmitter>
117   int request_subscription_delete(subscription_helper  &, subscription_response_helper &, std::string,  int ,  Transmitter &&);
118
119   void  Response(int, unsigned char *, int, const char *);
120   int const get_request_status(subscription_identifier);
121   subscription_response_helper * const get_subscription(subscription_identifier);
122   
123   unsigned int get_next_id(void);
124   void set_timeout(unsigned int);
125   void set_num_retries(unsigned int);
126   
127   bool is_subscription_entry(subscription_identifier); 
128   bool is_request_entry(subscription_identifier);
129   void get_subscription_keys(std::vector<subscription_identifier> &);
130   void clear(void);
131   size_t  num_pending(void) const;
132   size_t  num_complete(void) const ;
133
134
135   
136 private:
137
138   
139   bool add_request_entry(subscription_identifier, int);
140   bool set_request_status(subscription_identifier, int);
141   bool delete_request_entry(subscription_identifier);
142  
143   bool get_subscription_entry(subscription_identifier);
144   bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
145   bool delete_subscription_entry(subscription_identifier);
146   
147   std::unordered_map<subscription_identifier, int, subscription_hasher> requests_table;
148   std::unordered_map<subscription_identifier, subscription_response_helper, subscription_hasher> subscription_responses; // stores list of successful subscriptions
149   
150   std::unique_ptr<std::mutex> _data_lock;
151   std::unique_ptr<std::condition_variable> _cv;
152
153   std::chrono::seconds _time_out;
154   unsigned int _num_retries = 2;
155   unsigned int unique_request_id = 0;
156   
157 };
158
159 template <typename Transmitter>
160 int subscription_handler::request_subscription(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){
161   
162   int res;
163   unsigned char buffer[512];
164   size_t buf_len = 512;
165
166   // As per current design, request id and request sequence number
167   // must be set to zero ...  
168   he.set_request(0, 0);  
169   subscription_request e2ap_sub_req;
170
171   // generate subscription identifier
172   subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
173   
174   // generate the request pdu
175   res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he);
176   if(! res){
177     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
178     return SUBSCR_ERR_ENCODE;
179   }
180   
181   // put entry in request table
182   {
183     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
184     res = add_request_entry(sub_id, request_pending);
185     if(! res){
186       mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present",  __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
187       return SUBSCR_ERR_DUPLICATE;
188     }
189   }
190
191   // acquire lock ...
192   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
193
194
195   // Send the message
196   res = tx(TxCode,  buf_len, buffer);
197   if (!res){
198     // clear state
199     delete_request_entry(sub_id);
200     mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
201     return SUBSCR_ERR_TX;
202   };
203
204   
205   // record time stamp ..
206   auto start = std::chrono::system_clock::now();
207   res = SUBSCR_ERR_UNKNOWN;
208   
209   while(1){
210     // release lock and wait to be woken up
211     _cv.get()->wait_for(_local_lock, _time_out);
212     
213     // we have woken and acquired data_lock 
214     // check status and return appropriate object
215     
216     int status = get_request_status(sub_id);
217     
218     if (status == request_success){
219       // retreive  & store the subscription response 
220       response = subscription_responses[sub_id];
221       mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
222       res = SUBSCR_SUCCESS;
223       break;
224     }
225     
226     if (status == request_pending){
227       // woken up spuriously or timed out 
228       auto end = std::chrono::system_clock::now();
229       std::chrono::duration<double> f = end - start;
230       
231       if ( f > _num_retries * _time_out){
232         mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
233         res = SUBSCR_ERR_TIMEOUT;
234         break;
235       }
236       else{
237         mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));     
238         continue;
239       }
240     }
241
242     if(status == request_failed){
243       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d  got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
244       res = SUBSCR_ERR_FAIL;
245       break;
246     }
247
248     if (status == request_duplicate){
249       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
250       res = SUBSCR_ERR_DUPLICATE;
251       break;
252
253     }
254     
255     // if we are here, some spurious
256     // status obtained or request failed . we return appropriate error code 
257     mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
258     res = SUBSCR_ERR_UNKNOWN;
259     break;
260   };
261
262   delete_request_entry(sub_id);
263   
264   // release data lock
265   _local_lock.unlock();
266   std::cout <<"Returning  res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," <<  std::get<1>(sub_id) << std::endl;  
267   return res;
268 };
269
270
271 template <typename Transmitter>
272 int  subscription_handler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id,  int TxCode, Transmitter && tx){
273   
274   int res;
275   // generate subscription identifier 
276   subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
277   
278   // First check if we have this subscription
279   if(! is_subscription_entry(sub_id)){
280     mdclog_write(MDCLOG_ERR, "subscription with id %s, %d  does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));        
281     return SUBSCR_ERR_MISSING;
282   }  
283   
284   // Also check if such a request is queued
285   if (is_request_entry(sub_id)){
286     mdclog_write(MDCLOG_ERR, "Subscription delete request  with id %s, %d  already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
287     return SUBSCR_ERR_DUPLICATE;
288   }
289
290   subscription_delete e2ap_sub_req_del;
291   
292   // generate the delete request pdu
293   unsigned char buffer[128];
294   size_t buf_len = 128;
295   
296   res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
297   if(! res){
298     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
299     return SUBSCR_ERR_ENCODE;
300   }
301   
302   // put entry in request table
303   {
304     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
305     res = add_request_entry(sub_id, delete_request_pending);
306     if(!res){
307       mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate  subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
308       return SUBSCR_ERR_DUPLICATE;
309     }
310   }
311   
312   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
313
314   // Send the message
315   res = tx(TxCode,  buf_len, buffer);
316
317   if (!res){
318     delete_request_entry(sub_id);
319     mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
320     return SUBSCR_ERR_TX;
321   };
322
323   
324   // record time stamp ..
325   auto start = std::chrono::system_clock::now();
326
327   res = SUBSCR_ERR_UNKNOWN;
328   while(1){
329     
330     // wait to be woken up
331     _cv.get()->wait_for(_local_lock, _time_out);
332     
333     // check status and return appropriate object
334     int status = get_request_status(sub_id);
335     if (status == delete_request_success){
336       mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
337       res = SUBSCR_SUCCESS;
338       break;
339     }
340     
341     if (status == delete_request_pending){
342       // woken up spuriously or timed out 
343       auto end = std::chrono::system_clock::now();
344       std::chrono::duration<double> f = end - start;
345       
346       if (f > _num_retries * _time_out){
347         mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
348         res = SUBSCR_ERR_TIMEOUT;
349         break;
350       }
351       else{
352         mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); 
353       }
354       
355       continue;
356     }
357
358     if(status == delete_request_failed){
359       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d  got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
360       res = SUBSCR_ERR_FAIL;
361       break;
362     }
363
364     // if we are here, some spurious
365     // status obtained. we return false
366     
367     mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of  delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
368     res =  SUBSCR_ERR_UNKNOWN;
369     break;
370
371   };
372
373   delete_request_entry(sub_id);
374   
375   // release data lock
376   _local_lock.unlock();
377   std::cout <<"Returning  res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl;
378   return res;
379 };
380
381 #endif