Merge "Initial commit of source directory"
[ric-app/hw.git] / src / xapp-mgmt / subs_mgmt.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18 /*
19  * subs_mgmt.hpp
20  * Created on: 2019
21  * Author: Ashwin Shridharan, Shraboni Jana
22  */
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <functional>
30 #include <mdclog/mdclog.h>
31 #include <mutex>
32 #include <condition_variable>
33 #include <unordered_map>
34 #include <chrono>
35 #include <tuple>
36
37 #include "subscription_delete_request.hpp"
38 #include "subscription_delete_response.hpp"
39 #include "subscription_request.hpp"
40 #include "subscription_response.hpp"
41
42 #define SUBSCR_SUCCESS 0
43 #define SUBSCR_ERR_TX 1
44 #define SUBSCR_ERR_TIMEOUT 2
45 #define SUBSCR_ERR_FAIL 3
46 #define SUBSCR_ERR_UNKNOWN 4
47 #define SUBSCR_ERR_DUPLICATE 5
48 #define SUBSCR_ERR_ENCODE  6
49 #define SUBSCR_ERR_MISSING 7
50
51 using namespace std;
52
53 typedef enum {
54     request_pending = 1,
55     request_success,
56     request_failed,
57     delete_request_pending,
58     delete_request_success,
59     delete_request_failed,
60     request_duplicate
61 }Subscription_Status_Types;
62
63 using subscription_identifier = std::tuple<std::string , int>;
64
65 struct subscription_hasher {
66   size_t operator()(const subscription_identifier & key) const {
67     return  std::hash<std::string>{}(std::get<0>(key) + std::to_string(std::get<1>(key)));
68   }
69 };
70
71 class SubscriptionHandler {
72                             
73 public:
74
75         SubscriptionHandler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
76   
77   void init(void);
78
79   template <typename Transmitter>
80   int request_subscription(std::string, int ,   Transmitter &&);
81
82   template<typename Transmitter>
83   int request_subscription_delete(subscription_helper  &, subscription_response_helper &, std::string,  int ,  Transmitter &&);
84
85   void  Response(int, unsigned char *, int, const char *);
86   int const get_request_status(subscription_identifier);
87   subscription_response_helper * const get_subscription(subscription_identifier);
88   
89   unsigned int get_next_id(void);
90   void set_timeout(unsigned int);
91   void set_num_retries(unsigned int);
92   
93   bool is_subscription_entry(subscription_identifier); 
94   bool is_request_entry(subscription_identifier);
95   void get_subscription_keys(std::vector<subscription_identifier> &);
96   void clear(void);
97   size_t  num_pending(void) const;
98   size_t  num_complete(void) const ;
99
100
101   
102 private:
103
104   
105   bool add_request_entry(subscription_identifier, int);
106   bool set_request_status(subscription_identifier, int);
107   bool delete_request_entry(subscription_identifier);
108  
109   bool get_subscription_entry(subscription_identifier);
110   bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
111   bool delete_subscription_entry(subscription_identifier);
112   
113   std::unordered_map<subscription_identifier, int, subscription_hasher> requests_table;
114   std::unordered_map<subscription_identifier, subscription_response_helper, subscription_hasher> subscription_responses; // stores list of successful subscriptions
115   
116   std::unique_ptr<std::mutex> _data_lock;
117   std::unique_ptr<std::condition_variable> _cv;
118
119   std::chrono::seconds _time_out;
120   unsigned int _num_retries = 2;
121   unsigned int unique_request_id = 0;
122   
123 };
124
125 template <typename Transmitter>
126 int SubscriptionHandler::request_subscription(std::string node_id, int msgcode, Transmitter && tx){
127
128   // generate subscription identifier
129   subscription_identifier sub_id = std::make_tuple (node_id, 0); //0 is the function id which is hardcoded, which should come from rnib
130
131
132   bool res;
133   // put entry in request table
134   {
135     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
136     res = add_request_entry(sub_id, request_pending);
137     if(! res){
138       mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present",  __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
139       return SUBSCR_ERR_DUPLICATE;
140     }
141   }
142
143   // acquire lock ...
144   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
145
146
147   // Send the message
148   res = tx();
149
150   if (!res){
151     // clear state
152     delete_request_entry(sub_id);
153     mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
154     return SUBSCR_ERR_TX;
155   };
156
157
158   // record time stamp ..
159   auto start = std::chrono::system_clock::now();
160   res = SUBSCR_ERR_UNKNOWN;
161
162   while(1){
163     // release lock and wait to be woken up
164     _cv.get()->wait_for(_local_lock, _time_out);
165
166     // we have woken and acquired data_lock
167     // check status and return appropriate object
168
169     int status = get_request_status(sub_id);
170
171     if (status == request_success){
172       // retreive  & store the subscription response (why?)
173       // response = subscription_responses[sub_id];
174       mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
175       res = SUBSCR_SUCCESS;
176       break;
177     }
178
179     if (status == request_pending){
180       // woken up spuriously or timed out
181       auto end = std::chrono::system_clock::now();
182       std::chrono::duration<double> f = end - start;
183
184       if ( f > _num_retries * _time_out){
185         mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
186         res = SUBSCR_ERR_TIMEOUT;
187         break;
188       }
189       else{
190         mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
191         continue;
192       }
193     }
194
195     if(status == request_failed){
196       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d  got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
197       res = SUBSCR_ERR_FAIL;
198       break;
199     }
200
201     if (status == request_duplicate){
202       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
203       res = SUBSCR_ERR_DUPLICATE;
204       break;
205
206     }
207
208     // if we are here, some spurious
209     // status obtained or request failed . we return appropriate error code
210     mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
211     res = SUBSCR_ERR_UNKNOWN;
212     break;
213   };
214
215   delete_request_entry(sub_id);
216
217   // release data lock
218   _local_lock.unlock();
219   std::cout <<"Returning  res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," <<  std::get<1>(sub_id) << std::endl;
220   return res;
221 };
222
223
224 /*template <typename Transmitter>
225 int  SubscriptionHandler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id,  int TxCode, Transmitter && tx){
226
227   int res;
228   // generate subscription identifier
229   subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
230
231   // First check if we have this subscription
232   if(! is_subscription_entry(sub_id)){
233     mdclog_write(MDCLOG_ERR, "subscription with id %s, %d  does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
234     return SUBSCR_ERR_MISSING;
235   }
236
237   // Also check if such a request is queued
238   if (is_request_entry(sub_id)){
239     mdclog_write(MDCLOG_ERR, "Subscription delete request  with id %s, %d  already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
240     return SUBSCR_ERR_DUPLICATE;
241   }
242
243   subscription_delete e2ap_sub_req_del;
244
245   // generate the delete request pdu
246   unsigned char buffer[128];
247   size_t buf_len = 128;
248
249   res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
250   if(! res){
251     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
252     return SUBSCR_ERR_ENCODE;
253   }
254
255   // put entry in request table
256   {
257     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
258     res = add_request_entry(sub_id, delete_request_pending);
259     if(!res){
260       mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate  subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
261       return SUBSCR_ERR_DUPLICATE;
262     }
263   }
264
265   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
266
267   // Send the message
268   res = tx(TxCode,  buf_len, buffer);
269
270   if (!res){
271     delete_request_entry(sub_id);
272     mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
273     return SUBSCR_ERR_TX;
274   };
275
276
277   // record time stamp ..
278   auto start = std::chrono::system_clock::now();
279
280   res = SUBSCR_ERR_UNKNOWN;
281   while(1){
282
283     // wait to be woken up
284     _cv.get()->wait_for(_local_lock, _time_out);
285
286     // check status and return appropriate object
287     int status = get_request_status(sub_id);
288     if (status == delete_request_success){
289       mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
290       res = SUBSCR_SUCCESS;
291       break;
292     }
293
294     if (status == delete_request_pending){
295       // woken up spuriously or timed out
296       auto end = std::chrono::system_clock::now();
297       std::chrono::duration<double> f = end - start;
298
299       if (f > _num_retries * _time_out){
300         mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
301         res = SUBSCR_ERR_TIMEOUT;
302         break;
303       }
304       else{
305         mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
306       }
307
308       continue;
309     }
310
311     if(status == delete_request_failed){
312       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d  got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
313       res = SUBSCR_ERR_FAIL;
314       break;
315     }
316
317     // if we are here, some spurious
318     // status obtained. we return false
319
320     mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of  delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
321     res =  SUBSCR_ERR_UNKNOWN;
322     break;
323
324   };
325
326   delete_request_entry(sub_id);
327
328   // release data lock
329   _local_lock.unlock();
330   std::cout <<"Returning  res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl;
331   return res;
332 };*/
333
334 #endif