1da8adc01f581cb3be9277bcbbb13c592652f6b1
[ric-app/bouncer.git] / Bouncer / src / xapp-mgmt / subs_mgmt.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2019-2020 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18 /*
19  * subs_mgmt.hpp
20  * Created on: 2019
21  * Author: Ashwin Shridharan, Shraboni Jana
22  */
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <functional>
30 #include <mdclog/mdclog.h>
31 #include <mutex>
32 #include <condition_variable>
33 #include <unordered_map>
34 #include <algorithm>
35 #include <ctime>
36 #include <unistd.h>
37 #include <chrono>
38 #include <tuple>
39 #include <rmr/RIC_message_types.h>
40
41 #include "subscription_delete_request.hpp"
42 #include "subscription_delete_response.hpp"
43 #include "subscription_request.hpp"
44 #include "subscription_response.hpp"
45
46 #define SUBSCR_SUCCESS 1
47 #define SUBSCR_ERR_TX -1
48 #define SUBSCR_ERR_TIMEOUT -2
49 #define SUBSCR_ERR_FAIL -3
50 #define SUBSCR_ERR_UNKNOWN -4
51 #define SUBSCR_ERR_DUPLICATE -5
52 #define SUBSCR_ERR_NOT_FOUND -6
53 using namespace std;
54
55 class TransmitterBase
56 {
57 public:
58     virtual ~TransmitterBase() {}
59
60     template<class T>
61     const T& getParam() const; //to be implemented after Parameter
62
63     template<class T, class U>
64     void setParam(const U& rhs); //to be implemented after Parameter
65 };
66
67 template <typename T>
68 class Transmitter : public TransmitterBase
69 {
70 public:
71         Transmitter(const T& tx) :obj(tx) {}
72     const T& getParam() const {return obj;}
73     void setParam(const T& tx) {obj=tx;}
74 private:
75     T obj;
76 };
77
78 //Here's the trick: dynamic_cast rather than virtual
79 template<class T> const T& TransmitterBase::getParam() const
80 {
81         return dynamic_cast<const Transmitter<T>&>(*this).getParam();
82 }
83 template<class T, class U> void TransmitterBase::setParam(const U& rhs)
84 {
85         dynamic_cast<Transmitter<T>&>(*this).setParam(rhs);
86         return;
87 }
88
89 typedef enum {
90     request_pending = 1,
91     request_success,
92     request_failed,
93     request_duplicate
94 }Subscription_Status_Types;
95
96
97 using transaction_identifier = std::string;
98 using transaction_status = Subscription_Status_Types;
99
100 class SubscriptionHandler {
101                             
102 public:
103
104   SubscriptionHandler(unsigned int timeout_seconds = 30);
105   
106   template <typename AppTransmitter>
107   int manage_subscription_request(transaction_identifier, AppTransmitter &&);
108
109   template <typename AppTransmitter>
110   int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&);
111
112   void manage_subscription_response(int message_type, transaction_identifier id);
113
114   int  get_request_status(transaction_identifier);
115   bool set_request_status(transaction_identifier, transaction_status);
116   bool is_request_entry(transaction_identifier);
117   void set_timeout(unsigned int);
118   void clear(void);
119   void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
120
121   void print_subscription_status(){ for(auto it:status_table){std::cout << it.first << "::" << it.second << std::endl;}};
122
123 private:
124   
125   bool add_request_entry(transaction_identifier, transaction_status);
126   bool delete_request_entry(transaction_identifier);
127
128   template <typename AppTransmitter>
129   bool add_transmitter_entry(transaction_identifier, AppTransmitter&&);
130
131   std::unordered_map<transaction_identifier, TransmitterBase> trans_table;
132   std::unordered_map<transaction_identifier, transaction_status> status_table;
133
134   std::unique_ptr<std::mutex> _data_lock;
135   std::unique_ptr<std::condition_variable> _cv;
136
137   std::chrono::seconds _time_out;
138   
139   bool _ignore_subs_resp = false;
140 };
141
142 template <typename AppTransmitter>
143 bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
144
145         mdclog_write(MDCLOG_INFO,"Entry added for Transaction ID: %s", id.c_str());
146
147
148   // add entry in hash table if it does not exist
149   auto search = trans_table.find(id);
150   if(search != trans_table.end()){
151     return false;
152   }
153
154   Transmitter<AppTransmitter> tptr(trans);
155   trans_table[id] = tptr;
156   return true;
157
158 };
159
160 //this will work for both sending subscription request and subscription delete request.
161 //The handler is oblivious of the message content and follows the transaction id.
162 template<typename AppTransmitter>
163 int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){
164         int res;
165   // put entry in request table
166   {
167     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
168     res = add_request_entry(rmr_trans_id, request_pending);
169     if(! res){
170                 
171       mdclog_write(MDCLOG_ERR, "%s : Error adding new subscription request %s to queue because request with identical key already present",  __FILE__, __LINE__);
172           
173       return SUBSCR_ERR_DUPLICATE;
174     }
175   }
176
177
178   // acquire lock ...
179   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
180   // Send the message
181   bool flg = tx();
182   if (!flg){
183     // clear state
184     delete_request_entry(rmr_trans_id);
185     mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id.c_str());
186     return SUBSCR_ERR_TX;
187   } else {
188           mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id  %s", __FILE__, __LINE__, rmr_trans_id.c_str());
189           add_transmitter_entry(rmr_trans_id, tx);
190
191   }
192
193   // record time stamp ..
194   auto start = std::chrono::system_clock::now();
195   std::chrono::milliseconds t_out(_time_out);
196
197   //the wait functionality has been removed.
198
199
200   _local_lock.unlock();
201   // std::cout <<"Returning  res = " << res << " for request = " << rmr_trans_id  << std::endl;
202    return res;
203 };
204
205 template<typename AppTransmitter>
206 int SubscriptionHandler:: manage_subscription_delete_request(transaction_identifier rmr_trans_id, AppTransmitter && tx)
207 {
208         int res;
209         // delete  entry in request table
210         {
211                 std::lock_guard<std::mutex> lock(*(_data_lock.get()));
212                 res = delete_request_entry(rmr_trans_id);
213                 mdclog_write(MDCLOG_INFO,"res=%d",res);
214                 if(! res)
215                 {
216                         mdclog_write(MDCLOG_ERR, "%s : Error deleting new subscription request %s from queue because request with key doesn't present",  __FILE__, __LINE__);
217
218                         return SUBSCR_ERR_NOT_FOUND;
219                 }
220
221         }
222
223
224         // acquire lock ...
225         std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
226         // Send the message
227         bool flg = tx();
228
229         if (!flg)
230         {
231                 // add state
232                 res = add_request_entry(rmr_trans_id, request_pending);
233                 mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription delete request %s", __FILE__, __LINE__, rmr_trans_id.c_str());
234                 return SUBSCR_ERR_TX;
235         } 
236         else 
237         {
238                 mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription delete request for trans_id  %s", __FILE__, __LINE__, rmr_trans_id.c_str());
239
240         }
241
242         // record time stamp ..
243         auto start = std::chrono::system_clock::now();
244         std::chrono::milliseconds t_out(_time_out);
245
246         //the wait functionality has been removed.
247
248
249         _local_lock.unlock();
250         // std::cout <<"Returning  res = " << res << " for request = " << rmr_trans_id  << std::endl;
251         return res;
252 };
253 #endif