c50082780cf3d81caf37aec227afbc8a6bb02aa2
[ric-app/bouncer.git] / Bouncer / src / xapp-mgmt / subs_mgmt.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2019-2020 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18 /*
19  * subs_mgmt.hpp
20  * Created on: 2019
21  * Author: Ashwin Shridharan, Shraboni Jana
22  */
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <functional>
30 #include <mdclog/mdclog.h>
31 #include <mutex>
32 #include <condition_variable>
33 #include <unordered_map>
34 #include <algorithm>
35 #include <ctime>
36 #include <unistd.h>
37 #include <chrono>
38 #include <tuple>
39 #include <rmr/RIC_message_types.h>
40
41 #include "subscription_delete_request.hpp"
42 #include "subscription_delete_response.hpp"
43 #include "subscription_request.hpp"
44 #include "subscription_response.hpp"
45
46 #define SUBSCR_SUCCESS 1
47 #define SUBSCR_ERR_TX -1
48 #define SUBSCR_ERR_TIMEOUT -2
49 #define SUBSCR_ERR_FAIL -3
50 #define SUBSCR_ERR_UNKNOWN -4
51 #define SUBSCR_ERR_DUPLICATE -5
52
53 using namespace std;
54
55 class TransmitterBase
56 {
57 public:
58     virtual ~TransmitterBase() {}
59
60     template<class T>
61     const T& getParam() const; //to be implemented after Parameter
62
63     template<class T, class U>
64     void setParam(const U& rhs); //to be implemented after Parameter
65 };
66
67 template <typename T>
68 class Transmitter : public TransmitterBase
69 {
70 public:
71         Transmitter(const T& tx) :obj(tx) {}
72     const T& getParam() const {return obj;}
73     void setParam(const T& tx) {obj=tx;}
74 private:
75     T obj;
76 };
77
78 //Here's the trick: dynamic_cast rather than virtual
79 template<class T> const T& TransmitterBase::getParam() const
80 {
81         return dynamic_cast<const Transmitter<T>&>(*this).getParam();
82 }
83 template<class T, class U> void TransmitterBase::setParam(const U& rhs)
84 {
85         dynamic_cast<Transmitter<T>&>(*this).setParam(rhs);
86         return;
87 }
88
89 typedef enum {
90     request_pending = 1,
91     request_success,
92     request_failed,
93     request_duplicate
94 }Subscription_Status_Types;
95
96
97 using transaction_identifier = std::string;
98 using transaction_status = Subscription_Status_Types;
99
100 class SubscriptionHandler {
101                             
102 public:
103
104   SubscriptionHandler(unsigned int timeout_seconds = 30);
105   
106   template <typename AppTransmitter>
107   int manage_subscription_request(transaction_identifier, AppTransmitter &&);
108
109   template <typename AppTransmitter>
110   int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&);
111
112   void manage_subscription_response(int message_type, transaction_identifier id);
113
114   int  get_request_status(transaction_identifier);
115   bool set_request_status(transaction_identifier, transaction_status);
116   bool is_request_entry(transaction_identifier);
117   void set_timeout(unsigned int);
118   void clear(void);
119   void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
120
121   void print_subscription_status(){ for(auto it:status_table){std::cout << it.first << "::" << it.second << std::endl;}};
122
123 private:
124   
125   bool add_request_entry(transaction_identifier, transaction_status);
126   bool delete_request_entry(transaction_identifier);
127
128   template <typename AppTransmitter>
129   bool add_transmitter_entry(transaction_identifier, AppTransmitter&&);
130
131   std::unordered_map<transaction_identifier, TransmitterBase> trans_table;
132   std::unordered_map<transaction_identifier, transaction_status> status_table;
133
134   std::unique_ptr<std::mutex> _data_lock;
135   std::unique_ptr<std::condition_variable> _cv;
136
137   std::chrono::seconds _time_out;
138   
139   bool _ignore_subs_resp = false;
140 };
141
142 template <typename AppTransmitter>
143 bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
144           mdclog_write(MDCLOG_INFO,"Entry added for Transaction ID: %s",id.c_str());
145
146   // add entry in hash table if it does not exist
147   auto search = trans_table.find(id);
148   if(search != trans_table.end()){
149     return false;
150   }
151
152   Transmitter<AppTransmitter> tptr(trans);
153   trans_table[id] = tptr;
154   return true;
155
156 };
157
158 //this will work for both sending subscription request and subscription delete request.
159 //The handler is oblivious of the message content and follows the transaction id.
160 template<typename AppTransmitter>
161 int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){
162         int res;
163   // put entry in request table
164   {
165     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
166
167     res = add_request_entry(rmr_trans_id, request_pending);
168     if(! res){
169       mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present",  __FILE__, __LINE__, rmr_trans_id);
170       return SUBSCR_ERR_DUPLICATE;
171     }
172   }
173
174
175   // acquire lock ...
176   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
177
178   // Send the message
179   bool flg = tx();
180
181   if (!flg){
182     // clear state
183     delete_request_entry(rmr_trans_id);
184     mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id.c_str() );
185     return SUBSCR_ERR_TX;
186   } else {
187           mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id.c_str() );
188           add_transmitter_entry(rmr_trans_id, tx);
189
190   }
191
192   // record time stamp ..
193   auto start = std::chrono::system_clock::now();
194   std::chrono::milliseconds t_out(_time_out);
195
196   //the wait functionality has been removed.
197
198
199   _local_lock.unlock();
200   // std::cout <<"Returning  res = " << res << " for request = " << rmr_trans_id  << std::endl;
201    return res;
202 };
203
204 #endif