76c15467f5026a3d3b016ef598180556c1eac00d
[ric-plt/sdl.git] / src / syncstorageimpl.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include <chrono>
23 #include <sstream>
24 #include <sdl/asyncstorage.hpp>
25 #include <sdl/backenderror.hpp>
26 #include <sdl/errorqueries.hpp>
27 #include <sdl/invalidnamespace.hpp>
28 #include <sdl/notconnected.hpp>
29 #include <sdl/operationinterrupted.hpp>
30 #include <sdl/rejectedbybackend.hpp>
31 #include <sdl/rejectedbysdl.hpp>
32 #include "private/redis/asyncredisstorage.hpp"
33 #include "private/syncstorageimpl.hpp"
34 #include "private/system.hpp"
35
36 using namespace shareddatalayer;
37
38 namespace
39 {
40     void throwExceptionForErrorCode[[ noreturn ]](const std::error_code& ec)
41     {
42         if (ec == shareddatalayer::Error::BACKEND_FAILURE)
43             throw BackendError(ec.message());
44         else if (ec == shareddatalayer::Error::NOT_CONNECTED)
45             throw NotConnected(ec.message());
46         else if (ec == shareddatalayer::Error::OPERATION_INTERRUPTED)
47             throw OperationInterrupted(ec.message());
48         else if (ec == shareddatalayer::Error::REJECTED_BY_BACKEND)
49             throw RejectedByBackend(ec.message());
50         else if (ec == AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE)
51             throw InvalidNamespace(ec.message());
52         else if (ec == shareddatalayer::Error::REJECTED_BY_SDL)
53             throw RejectedBySdl(ec.message());
54
55         std::ostringstream os;
56         os << "No corresponding SDL exception found for error code: " << ec.category().name() << " " << ec.value();
57         throw std::range_error(os.str());
58     }
59 }
60
61 /* TODO: This synchronous API implementation could probably be refactored to be boost::asio based
62  * instead of current (bit error prone) poll based implementation.
63  */
64
65 SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> asyncStorage):
66     SyncStorageImpl(std::move(asyncStorage), System::getSystem())
67 {
68 }
69
70 SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> pAsyncStorage,
71                                  System& system):
72     asyncStorage(std::move(pAsyncStorage)),
73     system(system),
74     localStatus(false),
75     synced(false),
76     isReady(false),
77     events{ asyncStorage->fd(), POLLIN, 0 },
78     operationTimeout(std::chrono::steady_clock::duration::zero())
79 {
80 }
81
82 void SyncStorageImpl::waitReadyAck(const std::error_code&)
83 {
84     isReady = true;
85 }
86
87 void SyncStorageImpl::modifyAck(const std::error_code& error)
88 {
89     synced = true;
90     localError = error;
91 }
92
93 void SyncStorageImpl::modifyIfAck(const std::error_code& error, bool status)
94 {
95     synced = true;
96     localError = error;
97     localStatus = status;
98 }
99
100 void SyncStorageImpl::getAck(const std::error_code& error, const DataMap& dataMap)
101 {
102     synced = true;
103     localError = error;
104     localMap = dataMap;
105 }
106
107 void SyncStorageImpl::findKeysAck(const std::error_code& error, const Keys& keys)
108 {
109     synced = true;
110     localError = error;
111     localKeys = keys;
112 }
113
114 void SyncStorageImpl::verifyBackendResponse()
115 {
116     if(localError)
117         throwExceptionForErrorCode(localError);
118 }
119
120 void SyncStorageImpl::waitForOperationCallback()
121 {
122     while(!synced)
123         pollAndHandleEvents(NO_TIMEOUT);
124 }
125
126 void SyncStorageImpl::pollAndHandleEvents(int timeout_ms)
127 {
128     if (system.poll(&events, 1, timeout_ms) > 0 && (events.revents & POLLIN))
129         asyncStorage->handleEvents();
130 }
131
132 void SyncStorageImpl::waitForReadinessCheckCallback()
133 {
134     if (operationTimeout == std::chrono::steady_clock::duration::zero())
135     {
136         while (!isReady)
137             pollAndHandleEvents(NO_TIMEOUT);
138     }
139     else
140     {
141         int pollTimeout_ms = std::chrono::duration_cast<std::chrono::milliseconds>(operationTimeout).count() / 10;
142         std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
143         while(!isReady && (std::chrono::steady_clock::now() - start < operationTimeout))
144             pollAndHandleEvents(pollTimeout_ms);
145     }
146 }
147
148 void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns)
149 {
150     isReady = false;
151     asyncStorage->waitReadyAsync(ns,
152                                  std::bind(&shareddatalayer::SyncStorageImpl::waitReadyAck,
153                                            this,
154                                            std::error_code()));
155     waitForReadinessCheckCallback();
156 }
157
158 void SyncStorageImpl::set(const Namespace& ns, const DataMap& dataMap)
159 {
160     handlePendingEvents();
161     waitSdlToBeReady(ns);
162     synced = false;
163
164     asyncStorage->setAsync(ns,
165                            dataMap,
166                            std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
167                                      this,
168                                      std::placeholders::_1));
169     waitForOperationCallback();
170     verifyBackendResponse();
171 }
172
173 bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& oldData, const Data& newData)
174 {
175     handlePendingEvents();
176     waitSdlToBeReady(ns);
177     synced = false;
178     asyncStorage->setIfAsync(ns,
179                              key,
180                              oldData,
181                              newData,
182                              std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
183                                        this,
184                                        std::placeholders::_1,
185                                        std::placeholders::_2));
186     waitForOperationCallback();
187     verifyBackendResponse();
188     return localStatus;
189 }
190
191 bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const Data& data)
192 {
193     handlePendingEvents();
194     waitSdlToBeReady(ns);
195     synced = false;
196     asyncStorage->setIfNotExistsAsync(ns,
197                                       key,
198                                       data,
199                                       std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
200                                                 this,
201                                                 std::placeholders::_1,
202                                                 std::placeholders::_2));
203     waitForOperationCallback();
204     verifyBackendResponse();
205     return localStatus;
206 }
207
208 SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& keys)
209 {
210     handlePendingEvents();
211     waitSdlToBeReady(ns);
212     synced = false;
213     asyncStorage->getAsync(ns,
214                            keys,
215                            std::bind(&shareddatalayer::SyncStorageImpl::getAck,
216                                      this,
217                                      std::placeholders::_1,
218                                      std::placeholders::_2));
219     waitForOperationCallback();
220     verifyBackendResponse();
221     return localMap;
222 }
223
224 void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys)
225 {
226     handlePendingEvents();
227     waitSdlToBeReady(ns);
228     synced = false;
229     asyncStorage->removeAsync(ns,
230                               keys,
231                               std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
232                                         this,
233                                         std::placeholders::_1));
234     waitForOperationCallback();
235     verifyBackendResponse();
236 }
237
238 bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& data)
239 {
240     handlePendingEvents();
241     waitSdlToBeReady(ns);
242     synced = false;
243     asyncStorage->removeIfAsync(ns,
244                                 key,
245                                 data,
246                                 std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
247                                           this,
248                                           std::placeholders::_1,
249                                           std::placeholders::_2));
250     waitForOperationCallback();
251     verifyBackendResponse();
252     return localStatus;
253 }
254
255 SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::string& keyPrefix)
256 {
257     handlePendingEvents();
258     waitSdlToBeReady(ns);
259     synced = false;
260     asyncStorage->findKeysAsync(ns,
261                                 keyPrefix,
262                                 std::bind(&shareddatalayer::SyncStorageImpl::findKeysAck,
263                                           this,
264                                           std::placeholders::_1,
265                                           std::placeholders::_2));
266     waitForOperationCallback();
267     verifyBackendResponse();
268     return localKeys;
269 }
270
271 void SyncStorageImpl::removeAll(const Namespace& ns)
272 {
273     handlePendingEvents();
274     waitSdlToBeReady(ns);
275     synced = false;
276     asyncStorage->removeAllAsync(ns,
277                                  std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
278                                            this,
279                                            std::placeholders::_1));
280     waitForOperationCallback();
281     verifyBackendResponse();
282 }
283
284 void SyncStorageImpl::handlePendingEvents()
285 {
286     int pollRetVal = system.poll(&events, 1, 0);
287
288     while (pollRetVal > 0 && events.revents & POLLIN)
289     {
290         asyncStorage->handleEvents();
291         pollRetVal = system.poll(&events, 1, 0);
292     }
293 }
294
295 void SyncStorageImpl::setOperationTimeout(const std::chrono::steady_clock::duration& timeout)
296 {
297     operationTimeout = timeout;
298 }