2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
24 #include <sdl/asyncstorage.hpp>
25 #include <sdl/backenderror.hpp>
26 #include <sdl/errorqueries.hpp>
27 #include <sdl/invalidnamespace.hpp>
28 #include <sdl/notconnected.hpp>
29 #include <sdl/operationinterrupted.hpp>
30 #include <sdl/rejectedbybackend.hpp>
31 #include <sdl/rejectedbysdl.hpp>
32 #include "private/redis/asyncredisstorage.hpp"
33 #include "private/syncstorageimpl.hpp"
34 #include "private/system.hpp"
36 using namespace shareddatalayer;
40 void throwExceptionForErrorCode[[ noreturn ]](const std::error_code& ec)
42 if (ec == shareddatalayer::Error::BACKEND_FAILURE)
43 throw BackendError(ec.message());
44 else if (ec == shareddatalayer::Error::NOT_CONNECTED)
45 throw NotConnected(ec.message());
46 else if (ec == shareddatalayer::Error::OPERATION_INTERRUPTED)
47 throw OperationInterrupted(ec.message());
48 else if (ec == shareddatalayer::Error::REJECTED_BY_BACKEND)
49 throw RejectedByBackend(ec.message());
50 else if (ec == AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE)
51 throw InvalidNamespace(ec.message());
52 else if (ec == shareddatalayer::Error::REJECTED_BY_SDL)
53 throw RejectedBySdl(ec.message());
55 std::ostringstream os;
56 os << "No corresponding SDL exception found for error code: " << ec.category().name() << " " << ec.value();
57 throw std::range_error(os.str());
61 /* TODO: This synchronous API implementation could probably be refactored to be boost::asio based
62 * instead of current (bit error prone) poll based implementation.
65 SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> asyncStorage):
66 SyncStorageImpl(std::move(asyncStorage), System::getSystem())
70 SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> pAsyncStorage,
72 asyncStorage(std::move(pAsyncStorage)),
77 events{ asyncStorage->fd(), POLLIN, 0 },
78 operationTimeout(std::chrono::steady_clock::duration::zero())
82 void SyncStorageImpl::waitReadyAck(const std::error_code& error)
88 void SyncStorageImpl::modifyAck(const std::error_code& error)
94 void SyncStorageImpl::modifyIfAck(const std::error_code& error, bool status)
101 void SyncStorageImpl::getAck(const std::error_code& error, const DataMap& dataMap)
108 void SyncStorageImpl::findKeysAck(const std::error_code& error, const Keys& keys)
115 void SyncStorageImpl::verifyBackendResponse()
118 throwExceptionForErrorCode(localError);
121 void SyncStorageImpl::waitForOperationCallback()
124 pollAndHandleEvents(NO_TIMEOUT);
127 void SyncStorageImpl::pollAndHandleEvents(int timeout_ms)
129 if (system.poll(&events, 1, timeout_ms) > 0 && (events.revents & POLLIN))
130 asyncStorage->handleEvents();
133 void SyncStorageImpl::waitForReadinessCheckCallback(const std::chrono::steady_clock::duration& timeout)
135 if (timeout == std::chrono::steady_clock::duration::zero())
138 pollAndHandleEvents(NO_TIMEOUT);
142 auto timeout_ms(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
143 auto pollTimeout_ms(timeout_ms / 10);
144 std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
145 while(!isReady && (std::chrono::steady_clock::now() - start < std::chrono::milliseconds(timeout_ms)))
146 pollAndHandleEvents(pollTimeout_ms);
150 void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns)
152 waitSdlToBeReady(ns, operationTimeout);
155 void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns, const std::chrono::steady_clock::duration& timeout)
158 asyncStorage->waitReadyAsync(ns,
159 std::bind(&shareddatalayer::SyncStorageImpl::waitReadyAck,
161 std::placeholders::_1));
162 waitForReadinessCheckCallback(timeout);
165 void SyncStorageImpl::waitReady(const Namespace& ns, const std::chrono::steady_clock::duration& timeout)
167 waitSdlToBeReady(ns, timeout);
169 throw RejectedBySdl("Timeout, SDL service not ready for the '" + ns + "' namespace");
170 verifyBackendResponse();
173 void SyncStorageImpl::set(const Namespace& ns, const DataMap& dataMap)
175 handlePendingEvents();
176 waitSdlToBeReady(ns);
179 asyncStorage->setAsync(ns,
181 std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
183 std::placeholders::_1));
184 waitForOperationCallback();
185 verifyBackendResponse();
188 bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& oldData, const Data& newData)
190 handlePendingEvents();
191 waitSdlToBeReady(ns);
193 asyncStorage->setIfAsync(ns,
197 std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
199 std::placeholders::_1,
200 std::placeholders::_2));
201 waitForOperationCallback();
202 verifyBackendResponse();
206 bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const Data& data)
208 handlePendingEvents();
209 waitSdlToBeReady(ns);
211 asyncStorage->setIfNotExistsAsync(ns,
214 std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
216 std::placeholders::_1,
217 std::placeholders::_2));
218 waitForOperationCallback();
219 verifyBackendResponse();
223 SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& keys)
225 handlePendingEvents();
226 waitSdlToBeReady(ns);
228 asyncStorage->getAsync(ns,
230 std::bind(&shareddatalayer::SyncStorageImpl::getAck,
232 std::placeholders::_1,
233 std::placeholders::_2));
234 waitForOperationCallback();
235 verifyBackendResponse();
239 void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys)
241 handlePendingEvents();
242 waitSdlToBeReady(ns);
244 asyncStorage->removeAsync(ns,
246 std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
248 std::placeholders::_1));
249 waitForOperationCallback();
250 verifyBackendResponse();
253 bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& data)
255 handlePendingEvents();
256 waitSdlToBeReady(ns);
258 asyncStorage->removeIfAsync(ns,
261 std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
263 std::placeholders::_1,
264 std::placeholders::_2));
265 waitForOperationCallback();
266 verifyBackendResponse();
270 SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::string& keyPrefix)
272 handlePendingEvents();
273 waitSdlToBeReady(ns);
275 asyncStorage->findKeysAsync(ns,
277 std::bind(&shareddatalayer::SyncStorageImpl::findKeysAck,
279 std::placeholders::_1,
280 std::placeholders::_2));
281 waitForOperationCallback();
282 verifyBackendResponse();
286 SyncStorageImpl::Keys SyncStorageImpl::listKeys(const Namespace& ns, const std::string& pattern)
288 handlePendingEvents();
289 waitSdlToBeReady(ns);
291 asyncStorage->listKeys(ns,
293 std::bind(&shareddatalayer::SyncStorageImpl::findKeysAck,
295 std::placeholders::_1,
296 std::placeholders::_2));
297 waitForOperationCallback();
298 verifyBackendResponse();
302 void SyncStorageImpl::removeAll(const Namespace& ns)
304 handlePendingEvents();
305 waitSdlToBeReady(ns);
307 asyncStorage->removeAllAsync(ns,
308 std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
310 std::placeholders::_1));
311 waitForOperationCallback();
312 verifyBackendResponse();
315 void SyncStorageImpl::handlePendingEvents()
317 int pollRetVal = system.poll(&events, 1, 0);
319 while (pollRetVal > 0 && events.revents & POLLIN)
321 asyncStorage->handleEvents();
322 pollRetVal = system.poll(&events, 1, 0);
326 void SyncStorageImpl::setOperationTimeout(const std::chrono::steady_clock::duration& timeout)
328 operationTimeout = timeout;