Initial commit of Admission Control xAPP and E2AP/X2AP definitions
[ric-app/admin.git] / src / E2AP-c / subscription / subscription_handler.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <mdclog/mdclog.h>
30 #include <mutex>
31 #include <condition_variable>
32 #include <unordered_map>
33 #include <chrono>
34
35 #include <subscription_request.hpp>
36 #include <subscription_response.hpp>
37 #include <subscription_delete_request.hpp>
38 #include <subscription_delete_response.hpp>
39
40 using namespace std;
41
42 typedef enum {
43     request_pending = 1,
44     request_success,
45     request_failed,
46     delete_request_pending,
47     delete_request_success,
48     delete_request_failed,
49     request_duplicate
50 }Subscription_Status_Types;
51
52
53 /* Class to process subscription related messages 
54    each subscription request is assigned a unique internally
55 generated request id for tracking purposes. this is because
56 the e2 subscription request does not carry any gnodeb id information
57
58 */
59
60 class SubscriptionHandler {
61
62 public:
63   SubscriptionHandler(void);
64   SubscriptionHandler(unsigned int, unsigned int);
65   
66   void init(void);
67   template <typename Transmitter>
68   bool RequestSubscription(subscription_helper &,  subscription_response_helper &, int , Transmitter &&);
69
70   template<typename Transmitter>
71   bool RequestSubscriptionDelete(subscription_helper  &, subscription_response_helper &, int , Transmitter &&);
72
73
74   void  Response(int, unsigned char *, int);
75   int const get_request_status(int);
76   subscription_response_helper * const get_subscription(int);
77
78   unsigned int get_next_id(void);
79   void set_timeout(unsigned int);
80   void set_timeout_flag(bool);
81   void set_num_retries(unsigned int);
82   
83   bool is_subscription_entry(int); 
84   bool is_request_entry(int);
85
86   void clear(void);
87   size_t  num_pending(void) const;
88   size_t  num_complete(void) const ;
89
90
91   
92 private:
93
94   void ProcessSubscriptionResponse(unsigned char *, int len);
95   void ProcessSubscriptionFailure(unsigned char *, int len);
96
97   void ProcessDeleteResponse(unsigned char *, int len);
98   void ProcessSubscriptionDeleteFailure(unsigned char *, int len);
99
100   bool add_request_entry(int, int);
101   bool set_request_status(int, int);
102   bool delete_request_entry(int);
103  
104   bool get_subscription_entry(int, int);
105   bool add_subscription_entry(int, subscription_response_helper &he);
106   bool delete_subscription_entry(int);
107   
108   std::unordered_map<int, int> requests_table;
109   std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions
110   
111   std::unique_ptr<std::mutex> _data_lock;
112   std::unique_ptr<std::condition_variable> _cv;
113
114   std::chrono::seconds _time_out;
115   unsigned int _num_retries = 2;
116   bool _time_out_flag = true;
117   unsigned int unique_request_id = 0;
118   
119 };
120
121 template <typename Transmitter>
122 bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
123   
124   bool res;
125   unsigned char buffer[512];
126   size_t buf_len = 512;
127
128   // get a new unique request id ...
129   unsigned int new_req_id = get_next_id();
130   std::cout <<"Using id = " << new_req_id << std::endl;
131   he.set_request(new_req_id, he.get_req_seq());
132   
133   E2AP_PDU_t *e2ap_pdu = 0;
134   subscription_request e2ap_sub_req;
135   
136   // generate the request pdu
137   res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, e2ap_pdu, he);
138   if(! res){
139     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);
140     return false;
141   }
142   
143   // put entry in request table
144   {
145     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
146     res = add_request_entry(he.get_request_id(), request_pending);
147     if(! res){
148       mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue",  he.get_request_id());
149       return false;
150     }
151   }
152
153   // acquire lock ...
154   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
155
156
157   
158   // Send the message
159   res = tx(TxCode,  buf_len, buffer);
160   if (!res){
161     // clear state
162     delete_request_entry(he.get_request_id());
163     mdclog_write(MDCLOG_ERR, "Error transmitting subscription request %d", he.get_request_id());
164     return false;
165   };
166
167   
168   // record time stamp ..
169   auto start = std::chrono::system_clock::now();
170   
171   while(1){
172
173
174       
175     // wait to be woken up
176     _cv.get()->wait_for(_local_lock, _time_out);
177     
178     // we have woken and acquired data_lock 
179     // check status and return appropriate object
180     
181     int status = get_request_status(he.get_request_id());
182     
183     if (status == request_success){
184       mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());
185       break;
186     }
187     
188     if (status == request_pending){
189       // woken up spuriously or timed out 
190       auto end = std::chrono::system_clock::now();
191       std::chrono::duration<double> f = end - start;
192       
193       if (_time_out_flag && f > _num_retries * _time_out){
194         delete_request_entry(he.get_request_id());
195         mdclog_write(MDCLOG_ERR, "Subscription request %d timed out waiting for response ", he.get_request_id());
196
197         // Release data lock
198         _local_lock.unlock();
199         return false;
200       }
201       else{
202         mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());     
203         continue;
204       }
205     }
206
207     // if we are hear, some spurious
208     // status obtained or request failed . we return false
209     delete_request_entry(he.get_request_id());
210     
211     // release data lock
212     _local_lock.unlock();
213
214     return false;
215     
216   };
217              
218   // retreive the subscription response and clear queue
219   response = subscription_responses[he.get_request_id()];
220   delete_request_entry(he.get_request_id());
221
222   // release data lock
223   _local_lock.unlock();
224   
225   return true;
226 };
227
228
229 template <typename Transmitter>
230 bool SubscriptionHandler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
231   
232   bool res;
233
234   // First check if we have this subscription
235   if(! is_subscription_entry(he.get_request_id())){
236     mdclog_write(MDCLOG_ERR, "subscription with id %d  does not exist. Cannot be deleted",  he.get_request_id());       
237     return false;
238   }  
239   
240   // Also check if such a request is queued
241   if (is_request_entry(he.get_request_id())){
242     mdclog_write(MDCLOG_ERR, "Subscription delete request  with id %d  already in queue",  he.get_request_id());
243     return false;
244   }
245
246   subscription_delete e2ap_sub_req_del;
247   
248   // generate the delete request pdu
249   unsigned char buffer[128];
250   size_t buf_len = 128;
251   
252   res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
253   if(! res){
254     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
255     return false;
256   }
257   
258   // put entry in request table
259   res = add_request_entry(he.get_request_id(), delete_request_pending);
260   if(! res){
261     mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue",  he.get_request_id());
262     return false;
263   }
264   
265   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
266
267   // Send the message
268   res = tx(TxCode,  buf_len, buffer);
269
270   if (!res){
271     delete_request_entry(he.get_request_id());
272     mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id());
273     return false;
274   };
275
276   
277   // record time stamp ..
278   auto start = std::chrono::system_clock::now();
279
280   while(1){
281     
282     // wait to be woken up
283     _cv.get()->wait_for(_local_lock, _time_out);
284     
285     // check status and return appropriate object
286     int status = get_request_status(he.get_request_id());
287     if (status == delete_request_success){
288       delete_request_entry(he.get_request_id());
289       mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id());
290       _local_lock.unlock();
291       break;
292     }
293     
294     if (status == delete_request_pending){
295       // woken up spuriously or timed out 
296       auto end = std::chrono::system_clock::now();
297       std::chrono::duration<double> f = end - start;
298       
299       if (_time_out_flag && f > _num_retries * _time_out){
300         delete_request_entry(he.get_request_id());
301         mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id());
302         _local_lock.unlock();
303         return false;
304       }
305       else{
306         mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id()); 
307       }
308       
309       continue;
310     }
311
312     // if we are hear, some spurious
313     // status obtained. we return false
314     
315     delete_request_entry(he.get_request_id());
316     _local_lock.unlock();
317     return false;
318
319   };
320
321   return true;
322 };
323
324 #endif