9252ad007cc65196459d370516487f618d74a988
[ric-app/kpimon.git] / src / E2AP-c / subscription / subscription_handler.hpp
1 /*\r
2 ==================================================================================\r
3         Copyright (c) 2018-2019 SAMSUNG Intellectual Property.\r
4 \r
5    Licensed under the Apache License, Version 2.0 (the "License");\r
6    you may not use this file except in compliance with the License.\r
7    You may obtain a copy of the License at\r
8 \r
9        http://www.apache.org/licenses/LICENSE-2.0\r
10 \r
11    Unless required by applicable law or agreed to in writing, software\r
12    distributed under the License is distributed on an "AS IS" BASIS,\r
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
14    See the License for the specific language governing permissions and\r
15    limitations under the License.\r
16 ==================================================================================\r
17 */\r
18 \r
19 #pragma once\r
20 \r
21 #ifndef SUBSCRIPTION_HANDLER\r
22 #define SUBSCRIPTION_HANDLER\r
23 \r
24 #include <mdclog/mdclog.h>\r
25 #include <mutex>\r
26 #include <condition_variable>\r
27 #include <unordered_map>\r
28 #include <chrono>\r
29 \r
30 #include <subscription_request.hpp>\r
31 #include <subscription_response.hpp>\r
32 \r
33 using namespace std;\r
34 \r
35 typedef enum {\r
36     request_pending = 1,\r
37     request_success,\r
38     request_failed,\r
39     request_duplicate\r
40 }Subscription_Status_Types;\r
41 \r
42 \r
43 /* Class to process subscription related messages \r
44    each subscription request is assigned a unique internally\r
45 generated request id for tracking purposes. this is because\r
46 the e2 subscription request does not carry any gnodeb id information\r
47 \r
48 */\r
49 \r
50 class SubscriptionHandler {\r
51 \r
52 public:\r
53   SubscriptionHandler(void);\r
54   SubscriptionHandler(unsigned int, unsigned int);\r
55   \r
56   void init(void);\r
57   template <typename Transmitter>\r
58   bool RequestSubscription(subscription_helper &,  subscription_response_helper &, int , Transmitter &&);\r
59 \r
60 \r
61   void  Response(int, unsigned char *, int);\r
62   int const get_request_status(int);\r
63   subscription_response_helper * const get_subscription(int);\r
64 \r
65   unsigned int get_next_id(void);\r
66   void set_timeout(unsigned int);\r
67   void set_timeout_flag(bool);\r
68   void set_num_retries(unsigned int);\r
69   \r
70   bool is_subscription_entry(int); \r
71   bool is_request_entry(int);\r
72 \r
73   void clear(void);\r
74   size_t  num_pending(void) const;\r
75   size_t  num_complete(void) const ;\r
76 \r
77 \r
78   \r
79 private:\r
80 \r
81   void ProcessSubscriptionResponse(unsigned char *, int len);\r
82   void ProcessSubscriptionFailure(unsigned char *, int len);\r
83 \r
84   bool add_request_entry(int, int);\r
85   bool set_request_status(int, int);\r
86   bool delete_request_entry(int);\r
87  \r
88   bool get_subscription_entry(int, int);\r
89   bool add_subscription_entry(int, subscription_response_helper &he);\r
90   bool delete_subscription_entry(int);\r
91   \r
92   std::unordered_map<int, int> requests_table;\r
93   std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions\r
94   \r
95   std::unique_ptr<std::mutex> _data_lock;\r
96   std::unique_ptr<std::condition_variable> _cv;\r
97 \r
98   std::chrono::seconds _time_out;\r
99   unsigned int _num_retries = 2;\r
100   bool _time_out_flag = true;\r
101   unsigned int unique_request_id = 0;\r
102   \r
103 };\r
104 \r
105 template <typename Transmitter>\r
106 bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){\r
107   \r
108   bool res;\r
109   unsigned char buffer[512];\r
110   size_t buf_len = 512;\r
111 \r
112   // get a new unique request id ...\r
113   unsigned int new_req_id = get_next_id();\r
114   mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);\r
115   he.set_request(new_req_id, he.get_req_seq());\r
116   \r
117   E2AP_PDU_t *e2ap_pdu = 0;\r
118   subscription_request e2ap_sub_req;\r
119   \r
120   // generate the request pdu\r
121   res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, e2ap_pdu, he);\r
122   if(! res){\r
123     mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);\r
124     return false;\r
125   }\r
126   \r
127   // put entry in request table\r
128   {\r
129     std::lock_guard<std::mutex> lock(*(_data_lock.get()));\r
130     res = add_request_entry(he.get_request_id(), request_pending);\r
131     if(! res){\r
132       mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue",  he.get_request_id());\r
133       return false;\r
134     }\r
135   }\r
136 \r
137   // acquire lock ...\r
138   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));\r
139 \r
140 \r
141   // Send the message\r
142   res = tx(TxCode,  buf_len, buffer);\r
143   if (!res){\r
144     // clear state\r
145     delete_request_entry(he.get_request_id());\r
146     mdclog_write(MDCLOG_ERR, "Error transmitting subscription request %d", he.get_request_id());\r
147     return false;\r
148   };\r
149 \r
150   \r
151   // record time stamp ..\r
152   auto start = std::chrono::system_clock::now();\r
153   \r
154   while(1){\r
155 \r
156 \r
157     // release lock and wait to be woken up\r
158     _cv.get()->wait_for(_local_lock, _time_out);\r
159     \r
160     // we have woken and acquired data_lock \r
161     // check status and return appropriate object\r
162     \r
163     int status = get_request_status(he.get_request_id());\r
164     \r
165     if (status == request_success){\r
166       // retreive the subscription response and clear queue\r
167       response = subscription_responses[he.get_request_id()];\r
168 \r
169       // clear state\r
170       delete_request_entry(he.get_request_id());\r
171       \r
172       // release data lock\r
173       _local_lock.unlock();\r
174       mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());\r
175  \r
176       break;\r
177     }\r
178     \r
179     if (status == request_pending){\r
180       // woken up spuriously or timed out \r
181       auto end = std::chrono::system_clock::now();\r
182       std::chrono::duration<double> f = end - start;\r
183       \r
184       if (_time_out_flag && f > _num_retries * _time_out){\r
185         delete_request_entry(he.get_request_id());\r
186         mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());\r
187 \r
188         // Release data lock\r
189         _local_lock.unlock();\r
190         return false;\r
191       }\r
192       else{\r
193         mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());     \r
194         continue;\r
195       }\r
196     }\r
197 \r
198     // if we are here, some spurious\r
199     // status obtained or request failed . we return false\r
200     mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);\r
201     delete_request_entry(he.get_request_id());\r
202     \r
203     // release data lock\r
204     _local_lock.unlock();\r
205 \r
206     return false;\r
207     \r
208   };\r
209              \r
210   return true;\r
211 };\r
212 \r
213 #endif\r