bugfix for subscription
[ric-app/hw.git] / src / xapp-mgmt / subs_mgmt.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2019-2020 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18 /*
19  * subs_mgmt.hpp
20  * Created on: 2019
21  * Author: Ashwin Shridharan, Shraboni Jana
22  */
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <functional>
30 #include <mdclog/mdclog.h>
31 #include <mutex>
32 #include <condition_variable>
33 #include <unordered_map>
34 #include <chrono>
35 #include <tuple>
36 #include <string>
37
38 #include "../xapp-asn/e2ap/subscription_delete_request.hpp"
39 #include "../xapp-asn/e2ap/subscription_delete_response.hpp"
40 #include "../xapp-asn/e2ap/subscription_request.hpp"
41 #include "../xapp-asn/e2ap/subscription_response.hpp"
42
43 #define SUBSCR_SUCCESS 1
44 #define SUBSCR_ERR_TX -1
45 #define SUBSCR_ERR_TIMEOUT -2
46 #define SUBSCR_ERR_FAIL -3
47 #define SUBSCR_ERR_UNKNOWN -4
48 #define SUBSCR_ERR_DUPLICATE -5
49
50 using namespace std;
51
52 class TransmitterBase
53 {
54 public:
55     virtual ~TransmitterBase() {}
56
57     template<class T>
58     const T& getParam() const; //to be implemented after Parameter
59
60     template<class T, class U>
61     void setParam(const U& rhs); //to be implemented after Parameter
62 };
63
64 template <typename T>
65 class Transmitter : public TransmitterBase
66 {
67 public:
68         Transmitter(const T& tx) :obj(tx) {}
69     const T& getParam() const {return obj;}
70     void setParam(const T& tx) {obj=tx;}
71 private:
72     T obj;
73 };
74
75 //Here's the trick: dynamic_cast rather than virtual
76 template<class T> const T& TransmitterBase::getParam() const
77 {
78         return dynamic_cast<const Transmitter<T>&>(*this).getParam();
79 }
80 template<class T, class U> void TransmitterBase::setParam(const U& rhs)
81 {
82         dynamic_cast<Transmitter<T>&>(*this).setParam(rhs);
83         return;
84 }
85
86 typedef enum {
87     request_pending = 1,
88     request_success,
89     request_failed,
90     delete_request_pending,
91     delete_request_success,
92     delete_request_failed,
93     request_duplicate
94 }Subscription_Status_Types;
95
96 using transaction_identifier = std::string;
97 using transaction_status = Subscription_Status_Types;
98
99 class SubscriptionHandler {
100                             
101 public:
102
103   SubscriptionHandler(unsigned int timeout_seconds = 10);
104   
105   template <typename AppTransmitter>
106   int manage_subscription_request(transaction_identifier, AppTransmitter &&);
107
108   template <typename AppTransmitter>
109   int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&);
110
111   void manage_subscription_response(int message_type, transaction_identifier id);
112
113   int const get_request_status(transaction_identifier);
114   bool set_request_status(transaction_identifier, transaction_status);
115   bool is_request_entry(transaction_identifier);
116   void set_timeout(unsigned int);
117   void clear(void);
118   void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
119
120 private:
121   
122   bool add_request_entry(transaction_identifier, transaction_status);
123   bool delete_request_entry(transaction_identifier);
124
125   template <typename AppTransmitter>
126   bool add_transmitter_entry(transaction_identifier, AppTransmitter&&);
127
128   std::unordered_map<transaction_identifier, TransmitterBase> trans_table;
129   std::unordered_map<transaction_identifier, transaction_status> status_table;
130
131   std::unique_ptr<std::mutex> _data_lock;
132   std::unique_ptr<std::condition_variable> _cv;
133
134   std::chrono::seconds _time_out;
135   
136   bool _ignore_subs_resp = false;
137 };
138
139 template <typename AppTransmitter>
140 bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
141
142   // add entry in hash table if it does not exist
143   auto search = trans_table.find(id);
144   if(search != trans_table.end()){
145     return false;
146   }
147
148   Transmitter<AppTransmitter> tptr(trans);
149   trans_table[id] = tptr;
150   return true;
151
152 };
153
154 //this will work for both sending subscription request and subscription delete request.
155 //The handler is oblivious of the message content and follows the transaction id.
156 template<typename AppTransmitter>
157 int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){
158         int res;
159   // put entry in request table
160   {
161     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
162
163     res = add_request_entry(rmr_trans_id, request_pending);
164     if(! res){
165       mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present",  __FILE__, __LINE__, rmr_trans_id);
166       return SUBSCR_ERR_DUPLICATE;
167     }
168   }
169
170
171   // acquire lock ...
172   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
173
174   // Send the message
175   bool flg = tx();
176
177   if (!flg){
178     // clear state
179     delete_request_entry(rmr_trans_id);
180     mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id.c_str() );
181     return SUBSCR_ERR_TX;
182   } else {
183           mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id.c_str() );
184           add_transmitter_entry(rmr_trans_id, tx);
185
186   }
187
188   // record time stamp ..
189   auto start = std::chrono::system_clock::now();
190   res = SUBSCR_ERR_UNKNOWN;
191
192
193   while(1){
194           // release lock and wait to be woken up
195           _cv.get()->wait_for(_local_lock, _time_out);
196
197           // we have woken and acquired data_lock
198           // check status and return appropriate object
199           int status = get_request_status(rmr_trans_id);
200
201           if (status == request_success){
202                   mdclog_write(MDCLOG_INFO, "Successfully subscribed for request for trans_id %s", rmr_trans_id.c_str());
203                   res = SUBSCR_SUCCESS;
204                   break;
205           }
206
207           if (status == request_pending){
208                   // woken up spuriously or timed out
209                   auto end = std::chrono::system_clock::now();
210                   std::chrono::duration<double> f = end - start;
211
212                   if ( f > _time_out){
213
214                           mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id.c_str());
215
216                           res = SUBSCR_ERR_TIMEOUT;
217                           //sunny side scenario. assuming subscription response is received.
218                           //res = SUBSCR_SUCCESS;
219                           break;
220                   }
221                   else{
222                                  mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response....", rmr_trans_id.c_str());
223                                  continue;
224                   }
225
226           }
227
228           if(status == request_failed){
229                   mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s  got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
230                   res = SUBSCR_ERR_FAIL;
231                   break;
232           }
233
234           if (status == request_duplicate){
235                   mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
236                   res = SUBSCR_ERR_DUPLICATE;
237                   break;
238
239           }
240
241           // if we are here, some spurious
242           // status obtained or request failed . we return appropriate error code
243           mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, and state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
244           res = SUBSCR_ERR_UNKNOWN;
245           break;
246   };
247
248   delete_request_entry(rmr_trans_id);
249
250   // release data lock
251   _local_lock.unlock();
252  // std::cout <<"Returning  res = " << res << " for request = " << rmr_trans_id  << std::endl;
253   return res;
254 };
255
256 #endif