5673308c469e471483dd547d2a68fe7e53cc266a
[ric-app/hw.git] / src / xapp-mgmt / subs_mgmt.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2019-2020 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18 /*
19  * subs_mgmt.hpp
20  * Created on: 2019
21  * Author: Ashwin Shridharan, Shraboni Jana
22  */
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <functional>
30 #include <mdclog/mdclog.h>
31 #include <mutex>
32 #include <condition_variable>
33 #include <unordered_map>
34 #include <chrono>
35 #include <tuple>
36
37 #include "../xapp-asn/e2ap/subscription_delete_request.hpp"
38 #include "../xapp-asn/e2ap/subscription_delete_response.hpp"
39 #include "../xapp-asn/e2ap/subscription_request.hpp"
40 #include "../xapp-asn/e2ap/subscription_response.hpp"
41
42 #define SUBSCR_SUCCESS 1
43 #define SUBSCR_ERR_TX -1
44 #define SUBSCR_ERR_TIMEOUT -2
45 #define SUBSCR_ERR_FAIL -3
46 #define SUBSCR_ERR_UNKNOWN -4
47 #define SUBSCR_ERR_DUPLICATE -5
48
49 using namespace std;
50
51 class TransmitterBase
52 {
53 public:
54     virtual ~TransmitterBase() {}
55
56     template<class T>
57     const T& getParam() const; //to be implemented after Parameter
58
59     template<class T, class U>
60     void setParam(const U& rhs); //to be implemented after Parameter
61 };
62
63 template <typename T>
64 class Transmitter : public TransmitterBase
65 {
66 public:
67         Transmitter(const T& tx) :obj(tx) {}
68     const T& getParam() const {return obj;}
69     void setParam(const T& tx) {obj=tx;}
70 private:
71     T obj;
72 };
73
74 //Here's the trick: dynamic_cast rather than virtual
75 template<class T> const T& TransmitterBase::getParam() const
76 {
77         return dynamic_cast<const Transmitter<T>&>(*this).getParam();
78 }
79 template<class T, class U> void TransmitterBase::setParam(const U& rhs)
80 {
81         dynamic_cast<Transmitter<T>&>(*this).setParam(rhs);
82         return;
83 }
84
85 typedef enum {
86     request_pending = 1,
87     request_success,
88     request_failed,
89     delete_request_pending,
90     delete_request_success,
91     delete_request_failed,
92     request_duplicate
93 }Subscription_Status_Types;
94
95 using transaction_identifier = unsigned char*;
96 using transaction_status = Subscription_Status_Types;
97
98 class SubscriptionHandler {
99                             
100 public:
101
102   SubscriptionHandler(unsigned int timeout_seconds = 10);
103   
104   template <typename AppTransmitter>
105   int manage_subscription_request(transaction_identifier, AppTransmitter &&);
106
107   template <typename AppTransmitter>
108   int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&);
109
110   void manage_subscription_response(int message_type, transaction_identifier id);
111
112   int const get_request_status(transaction_identifier);
113   bool set_request_status(transaction_identifier, transaction_status);
114   bool is_request_entry(transaction_identifier);
115   void set_timeout(unsigned int);
116   void clear(void);
117   void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
118
119 private:
120   
121   bool add_request_entry(transaction_identifier, transaction_status);
122   bool delete_request_entry(transaction_identifier);
123
124   template <typename AppTransmitter>
125   bool add_transmitter_entry(transaction_identifier, AppTransmitter&&);
126
127   std::unordered_map<transaction_identifier, TransmitterBase> trans_table;
128   std::unordered_map<transaction_identifier, transaction_status> status_table;
129
130   std::unique_ptr<std::mutex> _data_lock;
131   std::unique_ptr<std::condition_variable> _cv;
132
133   std::chrono::seconds _time_out;
134   
135   bool _ignore_subs_resp = false;
136 };
137
138 template <typename AppTransmitter>
139 bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
140
141   // add entry in hash table if it does not exist
142   auto search = trans_table.find(id);
143   if(search != trans_table.end()){
144     return false;
145   }
146
147   Transmitter<AppTransmitter> tptr(trans);
148   trans_table[id] = tptr;
149   return true;
150
151 };
152
153 //this will work for both sending subscription request and subscription delete request.
154 //The handler is oblivious of the message content and follows the transaction id.
155 template<typename AppTransmitter>
156 int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){
157         int res;
158   // put entry in request table
159   {
160     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
161
162     res = add_request_entry(rmr_trans_id, request_pending);
163     if(! res){
164       mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present",  __FILE__, __LINE__, rmr_trans_id);
165       return SUBSCR_ERR_DUPLICATE;
166     }
167   }
168
169
170   // acquire lock ...
171   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
172
173   // Send the message
174   bool flg = tx();
175
176   if (!flg){
177     // clear state
178     delete_request_entry(rmr_trans_id);
179     mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id );
180     return SUBSCR_ERR_TX;
181   } else {
182           mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id );
183           add_transmitter_entry(rmr_trans_id, tx);
184
185   }
186
187   // record time stamp ..
188   auto start = std::chrono::system_clock::now();
189   res = SUBSCR_ERR_UNKNOWN;
190
191
192   while(1){
193           // release lock and wait to be woken up
194           _cv.get()->wait_for(_local_lock, _time_out);
195
196           // we have woken and acquired data_lock
197           // check status and return appropriate object
198           int status = get_request_status(rmr_trans_id);
199
200           if (status == request_success){
201                   mdclog_write(MDCLOG_INFO, "Successfully subscribed for request for trans_id %s", rmr_trans_id);
202                   res = SUBSCR_SUCCESS;
203                   break;
204           }
205
206           if (status == request_pending){
207                   // woken up spuriously or timed out
208                   auto end = std::chrono::system_clock::now();
209                   std::chrono::duration<double> f = end - start;
210
211                   if ( f > _time_out){
212
213                           mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id);
214
215                           //res = SUBSCR_ERR_TIMEOUT;
216                           //sunny side scenario. assuming subscription response is received.
217                           res = SUBSCR_SUCCESS;
218                           break;
219                   }
220                   else{
221                                  mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response....", rmr_trans_id);
222                                  continue;
223                   }
224
225           }
226
227           if(status == request_failed){
228                   mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s  got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
229                   res = SUBSCR_ERR_FAIL;
230                   break;
231           }
232
233           if (status == request_duplicate){
234                   mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
235                   res = SUBSCR_ERR_DUPLICATE;
236                   break;
237
238           }
239
240           // if we are here, some spurious
241           // status obtained or request failed . we return appropriate error code
242           mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, and state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
243           res = SUBSCR_ERR_UNKNOWN;
244           break;
245   };
246
247   delete_request_entry(rmr_trans_id);
248
249   // release data lock
250   _local_lock.unlock();
251  // std::cout <<"Returning  res = " << res << " for request = " << rmr_trans_id  << std::endl;
252   return res;
253 };
254
255 #endif