fcfd77f756ffee13561ea012b535f8b2b529e426
[ric-app/admin.git] / src / E2AP-c / subscription / subscription_handler.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <mdclog/mdclog.h>
30 #include <mutex>
31 #include <condition_variable>
32 #include <unordered_map>
33 #include <chrono>
34
35 #include <subscription_request.hpp>
36 #include <subscription_response.hpp>
37 #include <subscription_delete_request.hpp>
38 #include <subscription_delete_response.hpp>
39
40 #define SUBSCR_SUCCESS 0
41 #define SUBSCR_ERR_TX 1
42 #define SUBSCR_ERR_TIMEOUT 2
43 #define SUBSCR_ERR_FAIL 3
44 #define SUBSCR_ERR_UNKNOWN 4
45 #define SUBSCR_ERR_DUPLICATE 5
46 #define SUBSCR_ERR_ENCODE  6
47 #define SUBSCR_ERR_MISSING 7
48
49 using namespace std;
50
51 typedef enum {
52     request_pending = 1,
53     request_success,
54     request_failed,
55     delete_request_pending,
56     delete_request_success,
57     delete_request_failed,
58     request_duplicate
59 }Subscription_Status_Types;
60
61
62 /* Class to process subscription related messages 
63    each subscription request is assigned a unique internally
64 generated request id for tracking purposes. this is because
65 the e2 subscription request does not carry any gnodeb id information
66
67 */
68
69 class subscription_handler {
70
71 public:
72   subscription_handler(void);
73   subscription_handler(unsigned int, unsigned int);
74   
75   void init(void);
76   template <typename Transmitter>
77   int RequestSubscription(subscription_helper &,  subscription_response_helper &, int , Transmitter &&);
78
79   template<typename Transmitter>
80   int RequestSubscriptionDelete(subscription_helper  &, subscription_response_helper &, int , Transmitter &&);
81
82
83   void  Response(int, unsigned char *, int);
84   int const get_request_status(int);
85   subscription_response_helper * const get_subscription(int);
86
87   unsigned int get_next_id(void);
88   void set_timeout(unsigned int);
89   void set_num_retries(unsigned int);
90   
91   bool is_subscription_entry(int); 
92   bool is_request_entry(int);
93
94   void clear(void);
95   size_t  num_pending(void) const;
96   size_t  num_complete(void) const ;
97
98
99   
100 private:
101
102   bool add_request_entry(int, int);
103   bool set_request_status(int, int);
104   bool delete_request_entry(int);
105  
106   bool get_subscription_entry(int, int);
107   bool add_subscription_entry(int, subscription_response_helper &he);
108   bool delete_subscription_entry(int);
109   
110   std::unordered_map<int, int> requests_table;
111   std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions
112   
113   std::unique_ptr<std::mutex> _data_lock;
114   std::unique_ptr<std::condition_variable> _cv;
115
116   std::chrono::seconds _time_out;
117   unsigned int _num_retries = 2;
118   unsigned int unique_request_id = 0;
119   
120 };
121
122 template <typename Transmitter>
123 int subscription_handler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
124   
125   int res;
126   unsigned char buffer[512];
127   size_t buf_len = 512;
128
129   // get a new unique request id ...
130   unsigned int new_req_id = get_next_id();
131   mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);
132   he.set_request(new_req_id, he.get_req_seq());
133   
134
135   subscription_request e2ap_sub_req;
136   
137   // generate the request pdu
138   res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he);
139   if(! res){
140     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
141     return SUBSCR_ERR_ENCODE;
142   }
143   
144   // put entry in request table
145   {
146     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
147     res = add_request_entry(he.get_request_id(), request_pending);
148     if(! res){
149       mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %d to queue",  __FILE__, __LINE__, he.get_request_id());
150       return SUBSCR_ERR_UNKNOWN;
151     }
152   }
153
154   // acquire lock ...
155   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
156
157
158   // Send the message
159   res = tx(TxCode,  buf_len, buffer);
160   if (!res){
161     // clear state
162     delete_request_entry(he.get_request_id());
163     mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %d", __FILE__, __LINE__,  he.get_request_id());
164     return SUBSCR_ERR_TX;
165   };
166
167   
168   // record time stamp ..
169   auto start = std::chrono::system_clock::now();
170   res = SUBSCR_ERR_UNKNOWN;
171   
172   while(1){
173
174
175     // release lock and wait to be woken up
176     _cv.get()->wait_for(_local_lock, _time_out);
177     
178     // we have woken and acquired data_lock 
179     // check status and return appropriate object
180     
181     int status = get_request_status(he.get_request_id());
182     
183     if (status == request_success){
184
185       // retreive  & store the subscription response 
186       response = subscription_responses[he.get_request_id()];
187       mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());
188       res = SUBSCR_SUCCESS;
189       break;
190     }
191     
192     if (status == request_pending){
193
194       // woken up spuriously or timed out 
195       auto end = std::chrono::system_clock::now();
196       std::chrono::duration<double> f = end - start;
197       
198       if ( f > _num_retries * _time_out){
199         mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());
200         res = SUBSCR_ERR_TIMEOUT;
201         std::cout <<"Set res = " << res << " for " << he.get_request_id() << std::endl;
202         break;
203       }
204       else{
205         mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());     
206         continue;
207       }
208     }
209
210     if(status == request_failed){
211       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %d  got failure response .. \n", __FILE__, __LINE__, he.get_request_id());
212       res = SUBSCR_ERR_FAIL;
213       break;
214     }
215
216     // if we are here, some spurious
217     // status obtained or request failed . we return false
218     mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
219     res = SUBSCR_ERR_UNKNOWN;
220     break;
221   };
222
223   delete_request_entry(he.get_request_id());
224   
225   // release data lock
226   _local_lock.unlock();
227   std::cout <<"Returning  res = " << res << " for " << he.get_request_id() << std::endl;  
228   return res;
229 };
230
231
232 template <typename Transmitter>
233 int  subscription_handler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
234   
235   int res;
236
237   // First check if we have this subscription
238   if(! is_subscription_entry(he.get_request_id())){
239     mdclog_write(MDCLOG_ERR, "subscription with id %d  does not exist. Cannot be deleted",  he.get_request_id());       
240     return SUBSCR_ERR_MISSING;
241   }  
242   
243   // Also check if such a request is queued
244   if (is_request_entry(he.get_request_id())){
245     mdclog_write(MDCLOG_ERR, "Subscription delete request  with id %d  already in queue",  he.get_request_id());
246     return SUBSCR_ERR_UNKNOWN;
247   }
248
249   subscription_delete e2ap_sub_req_del;
250   
251   // generate the delete request pdu
252   unsigned char buffer[128];
253   size_t buf_len = 128;
254   
255   res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
256   if(! res){
257     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
258     return SUBSCR_ERR_ENCODE;
259   }
260   
261   // put entry in request table
262   res = add_request_entry(he.get_request_id(), delete_request_pending);
263   if(! res){
264     mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue",  he.get_request_id());
265     return SUBSCR_ERR_UNKNOWN;
266   }
267   
268   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
269
270   // Send the message
271   res = tx(TxCode,  buf_len, buffer);
272
273   if (!res){
274     delete_request_entry(he.get_request_id());
275     mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id());
276     return SUBSCR_ERR_TX;
277   };
278
279   
280   // record time stamp ..
281   auto start = std::chrono::system_clock::now();
282
283   res = SUBSCR_ERR_UNKNOWN;
284   while(1){
285     
286     // wait to be woken up
287     _cv.get()->wait_for(_local_lock, _time_out);
288     
289     // check status and return appropriate object
290     int status = get_request_status(he.get_request_id());
291     if (status == delete_request_success){
292       mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id());
293       res = SUBSCR_SUCCESS;
294       break;
295     }
296     
297     if (status == delete_request_pending){
298       // woken up spuriously or timed out 
299       auto end = std::chrono::system_clock::now();
300       std::chrono::duration<double> f = end - start;
301       
302       if (f > _num_retries * _time_out){
303         mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id());
304         res = SUBSCR_ERR_TIMEOUT;
305         break;
306       }
307       else{
308         mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id()); 
309       }
310       
311       continue;
312     }
313
314     if(status == delete_request_failed){
315       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %d  got failure response .. \n", __FILE__, __LINE__, he.get_request_id());
316       res = SUBSCR_ERR_FAIL;
317       break;
318     }
319     // if we are here, some spurious
320     // status obtained. we return false
321     
322     mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of  delete request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
323     res =  SUBSCR_ERR_UNKNOWN;
324     break;
325
326   };
327
328   delete_request_entry(he.get_request_id());
329   
330   // release data lock
331   _local_lock.unlock();
332   std::cout <<"Returning  res = " << res << " for " << he.get_request_id() << std::endl;
333   return res;
334 };
335
336 #endif