2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
24 #include <sdl/asyncstorage.hpp>
25 #include <sdl/backenderror.hpp>
26 #include <sdl/errorqueries.hpp>
27 #include <sdl/invalidnamespace.hpp>
28 #include <sdl/notconnected.hpp>
29 #include <sdl/operationinterrupted.hpp>
30 #include <sdl/rejectedbybackend.hpp>
31 #include <sdl/rejectedbysdl.hpp>
32 #include "private/redis/asyncredisstorage.hpp"
33 #include "private/syncstorageimpl.hpp"
34 #include "private/system.hpp"
36 using namespace shareddatalayer;
40 void throwExceptionForErrorCode[[ noreturn ]](const std::error_code& ec)
42 if (ec == shareddatalayer::Error::BACKEND_FAILURE)
43 throw BackendError(ec.message());
44 else if (ec == shareddatalayer::Error::NOT_CONNECTED)
45 throw NotConnected(ec.message());
46 else if (ec == shareddatalayer::Error::OPERATION_INTERRUPTED)
47 throw OperationInterrupted(ec.message());
48 else if (ec == shareddatalayer::Error::REJECTED_BY_BACKEND)
49 throw RejectedByBackend(ec.message());
50 else if (ec == AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE)
51 throw InvalidNamespace(ec.message());
52 else if (ec == shareddatalayer::Error::REJECTED_BY_SDL)
53 throw RejectedBySdl(ec.message());
55 std::ostringstream os;
56 os << "No corresponding SDL exception found for error code: " << ec.category().name() << " " << ec.value();
57 throw std::range_error(os.str());
61 /* TODO: This synchronous API implementation could probably be refactored to be boost::asio based
62 * instead of current (bit error prone) poll based implementation.
65 SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> asyncStorage):
66 SyncStorageImpl(std::move(asyncStorage), System::getSystem())
70 SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> pAsyncStorage,
72 asyncStorage(std::move(pAsyncStorage)),
77 events{ asyncStorage->fd(), POLLIN, 0 },
78 operationTimeout(std::chrono::steady_clock::duration::zero())
82 void SyncStorageImpl::waitReadyAck(const std::error_code&)
87 void SyncStorageImpl::modifyAck(const std::error_code& error)
93 void SyncStorageImpl::modifyIfAck(const std::error_code& error, bool status)
100 void SyncStorageImpl::getAck(const std::error_code& error, const DataMap& dataMap)
107 void SyncStorageImpl::findKeysAck(const std::error_code& error, const Keys& keys)
114 void SyncStorageImpl::verifyBackendResponse()
117 throwExceptionForErrorCode(localError);
120 void SyncStorageImpl::waitForOperationCallback()
123 pollAndHandleEvents(NO_TIMEOUT);
126 void SyncStorageImpl::pollAndHandleEvents(int timeout_ms)
128 if (system.poll(&events, 1, timeout_ms) > 0 && (events.revents & POLLIN))
129 asyncStorage->handleEvents();
132 void SyncStorageImpl::waitForReadinessCheckCallback()
134 if (operationTimeout == std::chrono::steady_clock::duration::zero())
137 pollAndHandleEvents(NO_TIMEOUT);
141 int pollTimeout_ms = std::chrono::duration_cast<std::chrono::milliseconds>(operationTimeout).count() / 10;
142 std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
143 while(!isReady && (std::chrono::steady_clock::now() - start < operationTimeout))
144 pollAndHandleEvents(pollTimeout_ms);
148 void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns)
151 asyncStorage->waitReadyAsync(ns,
152 std::bind(&shareddatalayer::SyncStorageImpl::waitReadyAck,
155 waitForReadinessCheckCallback();
158 void SyncStorageImpl::set(const Namespace& ns, const DataMap& dataMap)
160 handlePendingEvents();
161 waitSdlToBeReady(ns);
164 asyncStorage->setAsync(ns,
166 std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
168 std::placeholders::_1));
169 waitForOperationCallback();
170 verifyBackendResponse();
173 bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& oldData, const Data& newData)
175 handlePendingEvents();
176 waitSdlToBeReady(ns);
178 asyncStorage->setIfAsync(ns,
182 std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
184 std::placeholders::_1,
185 std::placeholders::_2));
186 waitForOperationCallback();
187 verifyBackendResponse();
191 bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const Data& data)
193 handlePendingEvents();
194 waitSdlToBeReady(ns);
196 asyncStorage->setIfNotExistsAsync(ns,
199 std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
201 std::placeholders::_1,
202 std::placeholders::_2));
203 waitForOperationCallback();
204 verifyBackendResponse();
208 SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& keys)
210 handlePendingEvents();
211 waitSdlToBeReady(ns);
213 asyncStorage->getAsync(ns,
215 std::bind(&shareddatalayer::SyncStorageImpl::getAck,
217 std::placeholders::_1,
218 std::placeholders::_2));
219 waitForOperationCallback();
220 verifyBackendResponse();
224 void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys)
226 handlePendingEvents();
227 waitSdlToBeReady(ns);
229 asyncStorage->removeAsync(ns,
231 std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
233 std::placeholders::_1));
234 waitForOperationCallback();
235 verifyBackendResponse();
238 bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& data)
240 handlePendingEvents();
241 waitSdlToBeReady(ns);
243 asyncStorage->removeIfAsync(ns,
246 std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
248 std::placeholders::_1,
249 std::placeholders::_2));
250 waitForOperationCallback();
251 verifyBackendResponse();
255 SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::string& keyPrefix)
257 handlePendingEvents();
258 waitSdlToBeReady(ns);
260 asyncStorage->findKeysAsync(ns,
262 std::bind(&shareddatalayer::SyncStorageImpl::findKeysAck,
264 std::placeholders::_1,
265 std::placeholders::_2));
266 waitForOperationCallback();
267 verifyBackendResponse();
271 void SyncStorageImpl::removeAll(const Namespace& ns)
273 handlePendingEvents();
274 waitSdlToBeReady(ns);
276 asyncStorage->removeAllAsync(ns,
277 std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
279 std::placeholders::_1));
280 waitForOperationCallback();
281 verifyBackendResponse();
284 void SyncStorageImpl::handlePendingEvents()
286 int pollRetVal = system.poll(&events, 1, 0);
288 while (pollRetVal > 0 && events.revents & POLLIN)
290 asyncStorage->handleEvents();
291 pollRetVal = system.poll(&events, 1, 0);
295 void SyncStorageImpl::setOperationTimeout(const std::chrono::steady_clock::duration& timeout)
297 operationTimeout = timeout;