2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
19 #include "private/error.hpp"
20 #include <sdl/emptynamespace.hpp>
21 #include <sdl/invalidnamespace.hpp>
22 #include <sdl/publisherid.hpp>
23 #include "private/abort.hpp"
24 #include "private/createlogger.hpp"
25 #include "private/engine.hpp"
26 #include "private/logger.hpp"
27 #include "private/namespacevalidator.hpp"
28 #include "private/configurationreader.hpp"
29 #include "private/redis/asynccommanddispatcher.hpp"
30 #include "private/redis/asyncdatabasediscovery.hpp"
31 #include "private/redis/asyncredisstorage.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/contentsbuilder.hpp"
34 #include "private/redis/redisgeneral.hpp"
35 #include "private/redis/reply.hpp"
37 using namespace shareddatalayer;
38 using namespace shareddatalayer::redis;
40 /* TODO: This implementation contains lot of duplicated code with old API (asyncRedisConnection).
41 * When this new API is fully ready and tested old API implementation could be changed to utilize this
42 * (bit like sync API utilizes async API).
47 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
48 const DatabaseInfo& databaseInfo,
49 std::shared_ptr<ContentsBuilder> contentsBuilder,
50 std::shared_ptr<Logger> logger)
52 return AsyncCommandDispatcher::create(engine,
59 class AsyncRedisStorageErrorCategory: public std::error_category
62 AsyncRedisStorageErrorCategory() = default;
64 const char* name() const noexcept override;
66 std::string message(int condition) const override;
68 std::error_condition default_error_condition(int condition) const noexcept override;
71 const char* AsyncRedisStorageErrorCategory::name() const noexcept
73 return "asyncredisstorage";
76 std::string AsyncRedisStorageErrorCategory::message(int condition) const
78 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
80 case AsyncRedisStorage::ErrorCode::SUCCESS:
81 return std::error_code().message();
82 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
83 return "connection to the underlying data storage not yet available";
84 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
85 return "invalid namespace identifier passed to SDL API";
86 case AsyncRedisStorage::ErrorCode::END_MARKER:
87 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)");
88 return "unsupported error code for message()";
90 return "description missing for AsyncRedisStorageErrorCategory error: " + std::to_string(condition);
94 std::error_condition AsyncRedisStorageErrorCategory::default_error_condition(int condition) const noexcept
96 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
98 case AsyncRedisStorage::ErrorCode::SUCCESS:
99 return InternalError::SUCCESS;
100 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
101 return InternalError::SDL_NOT_READY;
102 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
103 return InternalError::SDL_RECEIVED_INVALID_PARAMETER;
104 case AsyncRedisStorage::ErrorCode::END_MARKER:
105 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)");
106 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
108 std::ostringstream msg;
109 msg << "default error condition missing for AsyncRedisStorageErrorCategory error: "
111 logErrorOnce(msg.str());
112 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
116 AsyncStorage::DataMap buildDataMap(const AsyncStorage::Keys& keys, const Reply::ReplyVector& replyVector)
118 AsyncStorage::DataMap dataMap;
120 for (const auto& j : keys)
122 if (replyVector[i]->getType() == Reply::Type::STRING)
124 AsyncStorage::Data data;
125 auto dataStr(replyVector[i]->getString());
126 for (ReplyStringLength k(0); k < dataStr->len; ++k)
127 data.push_back(static_cast<uint8_t>(dataStr->str[static_cast<size_t>(k)]));
128 dataMap.insert({ j, data });
135 AsyncStorage::Key getKey(const Reply::DataItem& item)
137 std::string str(item.str.c_str(), static_cast<size_t>(item.len));
138 auto res(str.find(AsyncRedisStorage::SEPARATOR));
139 return str.substr(res + 1);
142 AsyncStorage::Keys getKeys(const Reply::ReplyVector& replyVector)
144 AsyncStorage::Keys keys;
145 for (const auto& i : replyVector)
147 if (i->getType() == Reply::Type::STRING)
148 keys.insert(getKey(*i->getString()));
153 void escapeRedisSearchPatternCharacters(std::string& stringToProcess)
155 const std::string redisSearchPatternCharacters = R"(*?[]\)";
157 std::size_t foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters);
159 while (foundPosition != std::string::npos)
161 stringToProcess.insert(foundPosition, R"(\)");
162 foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters, foundPosition + 2);
167 AsyncRedisStorage::ErrorCode& shareddatalayer::operator++ (AsyncRedisStorage::ErrorCode& ecEnum)
169 if (ecEnum == AsyncRedisStorage::ErrorCode::END_MARKER)
170 throw std::out_of_range("for AsyncRedisStorage::ErrorCode& operator ++");
171 ecEnum = AsyncRedisStorage::ErrorCode(static_cast<std::underlying_type<AsyncRedisStorage::ErrorCode>::type>(ecEnum) + 1);
175 std::error_code shareddatalayer::make_error_code(AsyncRedisStorage::ErrorCode errorCode)
177 return std::error_code(static_cast<int>(errorCode), AsyncRedisStorage::errorCategory());
180 const std::error_category& AsyncRedisStorage::errorCategory() noexcept
182 static const AsyncRedisStorageErrorCategory theAsyncRedisStorageErrorCategory;
183 return theAsyncRedisStorageErrorCategory;
186 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
187 std::shared_ptr<AsyncDatabaseDiscovery> discovery,
188 const boost::optional<PublisherId>& pId,
189 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
190 std::shared_ptr<Logger> logger):
191 AsyncRedisStorage(engine,
194 namespaceConfigurations,
195 ::asyncCommandDispatcherCreator,
196 std::make_shared<redis::ContentsBuilder>(SEPARATOR),
201 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
202 std::shared_ptr<redis::AsyncDatabaseDiscovery> discovery,
203 const boost::optional<PublisherId>& pId,
204 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
205 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
206 std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
207 std::shared_ptr<Logger> logger):
210 discovery(discovery),
212 asyncCommandDispatcherCreator(asyncCommandDispatcherCreator),
213 contentsBuilder(contentsBuilder),
214 namespaceConfigurations(namespaceConfigurations),
217 if(publisherId && (*publisherId).empty())
219 throw std::invalid_argument("AsyncRedisStorage: empty publisher ID string given");
222 discovery->setStateChangedCb([this](const redis::DatabaseInfo& databaseInfo)
224 serviceStateChanged(databaseInfo);
228 AsyncRedisStorage::~AsyncRedisStorage()
231 discovery->clearStateChangedCb();
233 dispatcher->disableCommandCallbacks();
236 redis::DatabaseInfo& AsyncRedisStorage::getDatabaseInfo()
241 void AsyncRedisStorage::serviceStateChanged(const redis::DatabaseInfo& newDatabaseInfo)
243 dispatcher = asyncCommandDispatcherCreator(*engine,
248 dispatcher->waitConnectedAsync([this]()
250 readyAck(std::error_code());
251 readyAck = ReadyAck();
253 dbInfo = newDatabaseInfo;
256 int AsyncRedisStorage::fd() const
261 void AsyncRedisStorage::handleEvents()
263 engine->handleEvents();
266 bool AsyncRedisStorage::canOperationBePerformed(const Namespace& ns,
267 boost::optional<bool> noKeysGiven,
268 std::error_code& ecToReturn)
270 if (!::isValidNamespace(ns))
272 logErrorOnce("Invalid namespace identifier: " + ns + " passed to SDL");
273 ecToReturn = std::error_code(ErrorCode::INVALID_NAMESPACE);
276 if (noKeysGiven && *noKeysGiven)
278 ecToReturn = std::error_code();
283 ecToReturn = std::error_code(ErrorCode::REDIS_NOT_YET_DISCOVERED);
287 ecToReturn = std::error_code();
291 void AsyncRedisStorage::waitReadyAsync(const Namespace&,
292 const ReadyAck& readyAck)
295 dispatcher->waitConnectedAsync([readyAck]()
297 readyAck(std::error_code());
300 this->readyAck = readyAck;
303 void AsyncRedisStorage::setAsync(const Namespace& ns,
304 const DataMap& dataMap,
305 const ModifyAck& modifyAck)
309 if (!canOperationBePerformed(ns, dataMap.empty(), ec))
311 engine->postCallback(std::bind(modifyAck, ec));
315 if (namespaceConfigurations->areNotificationsEnabled(ns))
316 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
318 std::placeholders::_1,
319 std::placeholders::_2,
322 contentsBuilder->build("MSETPUB", ns, dataMap, ns, getPublishMessage()));
324 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
326 std::placeholders::_1,
327 std::placeholders::_2,
330 contentsBuilder->build("MSET", ns, dataMap));
333 void AsyncRedisStorage::modificationCommandCallback(const std::error_code& error,
335 const ModifyAck& modifyAck )
340 void AsyncRedisStorage::conditionalCommandCallback(const std::error_code& error,
342 const ModifyIfAck& modifyIfAck)
344 auto type(reply.getType());
346 (type == Reply::Type::NIL) || // SETIE(PUB)
347 ((type == Reply::Type::INTEGER) && (reply.getInteger() != 1))) // SETNX(PUB) and DELIE(PUB)
348 modifyIfAck(error, false);
350 modifyIfAck(error, true);
353 void AsyncRedisStorage::setIfAsync(const Namespace& ns,
357 const ModifyIfAck& modifyIfAck)
361 if (!canOperationBePerformed(ns, boost::none, ec))
363 engine->postCallback(std::bind(modifyIfAck, ec, false));
367 if (namespaceConfigurations->areNotificationsEnabled(ns))
368 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
370 std::placeholders::_1,
371 std::placeholders::_2,
374 contentsBuilder->build("SETIEPUB", ns, key, newData, oldData, ns, getPublishMessage()));
376 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
378 std::placeholders::_1,
379 std::placeholders::_2,
382 contentsBuilder->build("SETIE", ns, key, newData, oldData));
385 void AsyncRedisStorage::removeIfAsync(const Namespace& ns,
388 const ModifyIfAck& modifyIfAck)
392 if (!canOperationBePerformed(ns, boost::none, ec))
394 engine->postCallback(std::bind(modifyIfAck, ec, false));
398 if (namespaceConfigurations->areNotificationsEnabled(ns))
399 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
401 std::placeholders::_1,
402 std::placeholders::_2,
405 contentsBuilder->build("DELIEPUB", ns, key, data, ns, getPublishMessage()));
407 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
409 std::placeholders::_1,
410 std::placeholders::_2,
413 contentsBuilder->build("DELIE", ns, key, data));
416 std::string AsyncRedisStorage::getPublishMessage() const
424 void AsyncRedisStorage::setIfNotExistsAsync(const Namespace& ns,
427 const ModifyIfAck& modifyIfAck)
431 if (!canOperationBePerformed(ns, boost::none, ec))
433 engine->postCallback(std::bind(modifyIfAck, ec, false));
437 if (namespaceConfigurations->areNotificationsEnabled(ns))
438 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
440 std::placeholders::_1,
441 std::placeholders::_2,
444 contentsBuilder->build("SETNXPUB", ns, key, data, ns ,getPublishMessage()));
446 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
448 std::placeholders::_1,
449 std::placeholders::_2,
452 contentsBuilder->build("SETNX", ns, key, data));
455 void AsyncRedisStorage::getAsync(const Namespace& ns,
457 const GetAck& getAck)
461 if (!canOperationBePerformed(ns, keys.empty(), ec))
463 engine->postCallback(std::bind(getAck, ec, DataMap()));
467 dispatcher->dispatchAsync([getAck, keys](const std::error_code& error,
471 getAck(error, DataMap());
473 getAck(std::error_code(), buildDataMap(keys, *reply.getArray()));
476 contentsBuilder->build("MGET", ns, keys));
479 void AsyncRedisStorage::removeAsync(const Namespace& ns,
481 const ModifyAck& modifyAck)
485 if (!canOperationBePerformed(ns, keys.empty(), ec))
487 engine->postCallback(std::bind(modifyAck, ec));
491 if (namespaceConfigurations->areNotificationsEnabled(ns))
492 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
494 std::placeholders::_1,
495 std::placeholders::_2,
498 contentsBuilder->build("DELPUB", ns, keys, ns, getPublishMessage()));
500 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
502 std::placeholders::_1,
503 std::placeholders::_2,
506 contentsBuilder->build("DEL", ns, keys));
509 void AsyncRedisStorage::findKeysAsync(const Namespace& ns,
510 const std::string& keyPrefix,
511 const FindKeysAck& findKeysAck)
513 //TODO: update to more optimal solution than current KEYS-based one.
516 if (!canOperationBePerformed(ns, boost::none, ec))
518 engine->postCallback(std::bind(findKeysAck, ec, Keys()));
522 dispatcher->dispatchAsync([findKeysAck](const std::error_code& error, const Reply& reply)
525 findKeysAck(error, Keys());
527 findKeysAck(std::error_code(), getKeys(*reply.getArray()));
530 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, keyPrefix)));
533 void AsyncRedisStorage::removeAllAsync(const Namespace& ns,
534 const ModifyAck& modifyAck)
538 if (!canOperationBePerformed(ns, boost::none, ec))
540 engine->postCallback(std::bind(modifyAck, ec));
544 dispatcher->dispatchAsync([this, modifyAck, ns](const std::error_code& error, const Reply& reply)
551 const auto& array(*reply.getArray());
553 modifyAck(std::error_code());
556 removeAsync(ns, getKeys(array), modifyAck);
560 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, "")));
563 std::string AsyncRedisStorage::buildKeyPrefixSearchPattern(const Namespace& ns, const std::string& keyPrefix) const
565 std::string escapedKeyPrefix = keyPrefix;
566 escapeRedisSearchPatternCharacters(escapedKeyPrefix);
567 std::ostringstream oss;
568 oss << '{' << ns << '}' << SEPARATOR << escapedKeyPrefix << "*";