Incorporating A1 HealthCheck functionality
[ric-app/hw.git] / src / xapp-mgmt / subs_mgmt.hpp
1 /*
2 ==================================================================================
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18 /*
19  * subs_mgmt.hpp
20  * Created on: 2019
21  * Author: Ashwin Shridharan, Shraboni Jana
22  */
23
24 #pragma once
25
26 #ifndef SUBSCRIPTION_HANDLER
27 #define SUBSCRIPTION_HANDLER
28
29 #include <functional>
30 #include <mdclog/mdclog.h>
31 #include <mutex>
32 #include <condition_variable>
33 #include <unordered_map>
34 #include <chrono>
35 #include <tuple>
36
37 #include "../xapp-formats/e2ap/subscription_delete_request.hpp"
38 #include "../xapp-formats/e2ap/subscription_delete_response.hpp"
39 #include "../xapp-formats/e2ap/subscription_request.hpp"
40 #include "../xapp-formats/e2ap/subscription_response.hpp"
41
42 #define SUBSCR_SUCCESS 0
43 #define SUBSCR_ERR_TX 1
44 #define SUBSCR_ERR_TIMEOUT 2
45 #define SUBSCR_ERR_FAIL 3
46 #define SUBSCR_ERR_UNKNOWN 4
47 #define SUBSCR_ERR_DUPLICATE 5
48 #define SUBSCR_ERR_ENCODE  6
49 #define SUBSCR_ERR_MISSING 7
50
51 using namespace std;
52
53 typedef enum {
54     request_pending = 1,
55     request_success,
56     request_failed,
57     delete_request_pending,
58     delete_request_success,
59     delete_request_failed,
60     request_duplicate
61 }Subscription_Status_Types;
62
63 using subscription_identifier = std::string; //here subscription_identifier is the rmr transaction id.
64
65 class SubscriptionHandler {
66                             
67 public:
68
69         SubscriptionHandler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
70   
71   void init(void);
72
73   template <typename Transmitter>
74   int request_subscription(std::string, Transmitter &&);
75
76   template<typename Transmitter>
77   int request_subscription_delete(std::string,  int ,  Transmitter &&);
78
79   void  Response(int, unsigned char *, int, const char *);
80   int const get_request_status(subscription_identifier);
81   subscription_response_helper * const get_subscription(subscription_identifier);
82   
83   unsigned int get_next_id(void);
84   void set_timeout(unsigned int);
85   void set_num_retries(unsigned int);
86   
87   bool is_subscription_entry(subscription_identifier); 
88   bool is_request_entry(subscription_identifier);
89   void get_subscription_keys(std::vector<subscription_identifier> &);
90   void clear(void);
91   size_t  num_pending(void) const;
92   size_t  num_complete(void) const ;
93
94
95   
96 private:
97
98   
99   bool add_request_entry(subscription_identifier, int);
100   bool set_request_status(subscription_identifier, int);
101   bool delete_request_entry(subscription_identifier);
102  
103   bool get_subscription_entry(subscription_identifier);
104   bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
105   bool delete_subscription_entry(subscription_identifier);
106
107   std::unordered_map<subscription_identifier, int> requests_table;
108   
109   std::unique_ptr<std::mutex> _data_lock;
110   std::unique_ptr<std::condition_variable> _cv;
111
112   std::chrono::seconds _time_out;
113   unsigned int _num_retries = 2;
114   unsigned int unique_request_id = 0;
115   
116 };
117
118 template <typename Transmitter>
119 int SubscriptionHandler::request_subscription(std::string rmr_trans_id, Transmitter && tx){
120   bool res;
121
122   // put entry in request table
123   {
124     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
125     res = add_request_entry(rmr_trans_id, request_pending);
126     if(! res){
127       mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present",  __FILE__, __LINE__, rmr_trans_id);
128       return SUBSCR_ERR_DUPLICATE;
129     }
130   }
131
132   // acquire lock ...
133   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
134
135   // Send the message
136   res = tx();
137
138   if (!res){
139     // clear state
140     delete_request_entry(rmr_trans_id);
141     mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id );
142     return SUBSCR_ERR_TX;
143   };
144
145
146   // record time stamp ..
147   auto start = std::chrono::system_clock::now();
148   res = SUBSCR_ERR_UNKNOWN;
149
150   while(1){
151     // release lock and wait to be woken up
152     _cv.get()->wait_for(_local_lock, _time_out);
153
154     // we have woken and acquired data_lock
155     // check status and return appropriate object
156
157     int status = get_request_status(rmr_trans_id);
158
159     if (status == request_success){
160       // retreive  & store the subscription response (why?)
161       // response = subscription_responses[sub_id];
162       mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", rmr_trans_id);
163       res = SUBSCR_SUCCESS;
164       break;
165     }
166
167     if (status == request_pending){
168       // woken up spuriously or timed out
169       auto end = std::chrono::system_clock::now();
170       std::chrono::duration<double> f = end - start;
171
172       if ( f > _num_retries * _time_out){
173         mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id);
174         res = SUBSCR_ERR_TIMEOUT;
175         break;
176       }
177       else{
178         mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response ....", rmr_trans_id);
179         continue;
180       }
181     }
182
183     if(status == request_failed){
184       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s  got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
185       res = SUBSCR_ERR_FAIL;
186       break;
187     }
188
189     if (status == request_duplicate){
190       mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
191       res = SUBSCR_ERR_DUPLICATE;
192       break;
193
194     }
195
196     // if we are here, some spurious
197     // status obtained or request failed . we return appropriate error code
198     mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
199     res = SUBSCR_ERR_UNKNOWN;
200     break;
201   };
202
203   delete_request_entry(rmr_trans_id);
204
205   // release data lock
206   _local_lock.unlock();
207   std::cout <<"Returning  res = " << res << " for request = " << rmr_trans_id  << std::endl;
208   return res;
209 };
210
211 #endif