Added debugging and fixes for incorrect messages
[ric-app/admin.git] / src / E2AP-c / subscription / subscription_handler.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <mdclog/mdclog.h>
30 #include <mutex>
31 #include <condition_variable>
32 #include <unordered_map>
33 #include <chrono>
34
35 #include <subscription_request.hpp>
36 #include <subscription_response.hpp>
37 #include <subscription_delete_request.hpp>
38 #include <subscription_delete_response.hpp>
39
40 using namespace std;
41
42 typedef enum {
43     request_pending = 1,
44     request_success,
45     request_failed,
46     delete_request_pending,
47     delete_request_success,
48     delete_request_failed,
49     request_duplicate
50 }Subscription_Status_Types;
51
52
53 /* Class to process subscription related messages 
54    each subscription request is assigned a unique internally
55 generated request id for tracking purposes. this is because
56 the e2 subscription request does not carry any gnodeb id information
57
58 */
59
60 class SubscriptionHandler {
61
62 public:
63   SubscriptionHandler(void);
64   SubscriptionHandler(unsigned int, unsigned int);
65   
66   void init(void);
67   template <typename Transmitter>
68   bool RequestSubscription(subscription_helper &,  subscription_response_helper &, int , Transmitter &&);
69
70   template<typename Transmitter>
71   bool RequestSubscriptionDelete(subscription_helper  &, subscription_response_helper &, int , Transmitter &&);
72
73
74   void  Response(int, unsigned char *, int);
75   int const get_request_status(int);
76   subscription_response_helper * const get_subscription(int);
77
78   unsigned int get_next_id(void);
79   void set_timeout(unsigned int);
80   void set_timeout_flag(bool);
81   void set_num_retries(unsigned int);
82   
83   bool is_subscription_entry(int); 
84   bool is_request_entry(int);
85
86   void clear(void);
87   size_t  num_pending(void) const;
88   size_t  num_complete(void) const ;
89
90
91   
92 private:
93
94   void ProcessSubscriptionResponse(unsigned char *, int len);
95   void ProcessSubscriptionFailure(unsigned char *, int len);
96
97   void ProcessDeleteResponse(unsigned char *, int len);
98   void ProcessSubscriptionDeleteFailure(unsigned char *, int len);
99
100   bool add_request_entry(int, int);
101   bool set_request_status(int, int);
102   bool delete_request_entry(int);
103  
104   bool get_subscription_entry(int, int);
105   bool add_subscription_entry(int, subscription_response_helper &he);
106   bool delete_subscription_entry(int);
107   
108   std::unordered_map<int, int> requests_table;
109   std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions
110   
111   std::unique_ptr<std::mutex> _data_lock;
112   std::unique_ptr<std::condition_variable> _cv;
113
114   std::chrono::seconds _time_out;
115   unsigned int _num_retries = 2;
116   bool _time_out_flag = true;
117   unsigned int unique_request_id = 0;
118   
119 };
120
121 template <typename Transmitter>
122 bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
123   
124   bool res;
125   unsigned char buffer[512];
126   size_t buf_len = 512;
127
128   // get a new unique request id ...
129   unsigned int new_req_id = get_next_id();
130   mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);
131   he.set_request(new_req_id, he.get_req_seq());
132   
133   E2AP_PDU_t *e2ap_pdu = 0;
134   subscription_request e2ap_sub_req;
135   
136   // generate the request pdu
137   res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, e2ap_pdu, he);
138   if(! res){
139     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
140     return false;
141   }
142   
143   // put entry in request table
144   {
145     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
146     res = add_request_entry(he.get_request_id(), request_pending);
147     if(! res){
148       mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue",  he.get_request_id());
149       return false;
150     }
151   }
152
153   // acquire lock ...
154   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
155
156
157   // Send the message
158   res = tx(TxCode,  buf_len, buffer);
159   if (!res){
160     // clear state
161     delete_request_entry(he.get_request_id());
162     mdclog_write(MDCLOG_ERR, "Error transmitting subscription request %d", he.get_request_id());
163     return false;
164   };
165
166   
167   // record time stamp ..
168   auto start = std::chrono::system_clock::now();
169   
170   while(1){
171
172
173     // release lock and wait to be woken up
174     _cv.get()->wait_for(_local_lock, _time_out);
175     
176     // we have woken and acquired data_lock 
177     // check status and return appropriate object
178     
179     int status = get_request_status(he.get_request_id());
180     
181     if (status == request_success){
182       // retreive the subscription response and clear queue
183       response = subscription_responses[he.get_request_id()];
184
185       // clear state
186       delete_request_entry(he.get_request_id());
187       
188       // release data lock
189       _local_lock.unlock();
190       mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());
191  
192       break;
193     }
194     
195     if (status == request_pending){
196
197       // woken up spuriously or timed out 
198       auto end = std::chrono::system_clock::now();
199       std::chrono::duration<double> f = end - start;
200       
201       if (_time_out_flag && f > _num_retries * _time_out){
202         delete_request_entry(he.get_request_id());
203         mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());
204
205         // Release data lock
206         _local_lock.unlock();
207         return false;
208       }
209       else{
210         mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());     
211         continue;
212       }
213     }
214
215     // if we are here, some spurious
216     // status obtained or request failed . we return false
217     mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
218     delete_request_entry(he.get_request_id());
219     
220     // release data lock
221     _local_lock.unlock();
222
223     return false;
224     
225   };
226              
227   
228   return true;
229 };
230
231
232 template <typename Transmitter>
233 bool SubscriptionHandler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
234   
235   bool res;
236
237   // First check if we have this subscription
238   if(! is_subscription_entry(he.get_request_id())){
239     mdclog_write(MDCLOG_ERR, "subscription with id %d  does not exist. Cannot be deleted",  he.get_request_id());       
240     return false;
241   }  
242   
243   // Also check if such a request is queued
244   if (is_request_entry(he.get_request_id())){
245     mdclog_write(MDCLOG_ERR, "Subscription delete request  with id %d  already in queue",  he.get_request_id());
246     return false;
247   }
248
249   subscription_delete e2ap_sub_req_del;
250   
251   // generate the delete request pdu
252   unsigned char buffer[128];
253   size_t buf_len = 128;
254   
255   res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
256   if(! res){
257     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
258     return false;
259   }
260   
261   // put entry in request table
262   res = add_request_entry(he.get_request_id(), delete_request_pending);
263   if(! res){
264     mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue",  he.get_request_id());
265     return false;
266   }
267   
268   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
269
270   // Send the message
271   res = tx(TxCode,  buf_len, buffer);
272
273   if (!res){
274     delete_request_entry(he.get_request_id());
275     mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id());
276     return false;
277   };
278
279   
280   // record time stamp ..
281   auto start = std::chrono::system_clock::now();
282
283   while(1){
284     
285     // wait to be woken up
286     _cv.get()->wait_for(_local_lock, _time_out);
287     
288     // check status and return appropriate object
289     int status = get_request_status(he.get_request_id());
290     if (status == delete_request_success){
291       delete_request_entry(he.get_request_id());
292       mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id());
293       _local_lock.unlock();
294       break;
295     }
296     
297     if (status == delete_request_pending){
298       // woken up spuriously or timed out 
299       auto end = std::chrono::system_clock::now();
300       std::chrono::duration<double> f = end - start;
301       
302       if (_time_out_flag && f > _num_retries * _time_out){
303         delete_request_entry(he.get_request_id());
304         mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id());
305         _local_lock.unlock();
306         return false;
307       }
308       else{
309         mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id()); 
310       }
311       
312       continue;
313     }
314
315     // if we are hear, some spurious
316     // status obtained. we return false
317     
318     delete_request_entry(he.get_request_id());
319     _local_lock.unlock();
320     return false;
321
322   };
323
324   return true;
325 };
326
327 #endif