2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
19 #include "private/error.hpp"
20 #include <sdl/emptynamespace.hpp>
21 #include <sdl/invalidnamespace.hpp>
22 #include <sdl/publisherid.hpp>
23 #include "private/abort.hpp"
24 #include "private/createlogger.hpp"
25 #include "private/engine.hpp"
26 #include "private/logger.hpp"
27 #include "private/namespacevalidator.hpp"
28 #include "private/configurationreader.hpp"
29 #include "private/redis/asynccommanddispatcher.hpp"
30 #include "private/redis/asyncdatabasediscovery.hpp"
31 #include "private/redis/asyncredisstorage.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/contentsbuilder.hpp"
34 #include "private/redis/redisgeneral.hpp"
35 #include "private/redis/reply.hpp"
37 using namespace shareddatalayer;
38 using namespace shareddatalayer::redis;
40 /* TODO: This implementation contains lot of duplicated code with old API (asyncRedisConnection).
41 * When this new API is fully ready and tested old API implementation could be changed to utilize this
42 * (bit like sync API utilizes async API).
47 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
48 const DatabaseInfo& databaseInfo,
49 std::shared_ptr<ContentsBuilder> contentsBuilder,
50 std::shared_ptr<Logger> logger)
52 return AsyncCommandDispatcher::create(engine,
60 class AsyncRedisStorageErrorCategory: public std::error_category
63 AsyncRedisStorageErrorCategory() = default;
65 const char* name() const noexcept override;
67 std::string message(int condition) const override;
69 std::error_condition default_error_condition(int condition) const noexcept override;
72 const char* AsyncRedisStorageErrorCategory::name() const noexcept
74 return "asyncredisstorage";
77 std::string AsyncRedisStorageErrorCategory::message(int condition) const
79 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
81 case AsyncRedisStorage::ErrorCode::SUCCESS:
82 return std::error_code().message();
83 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
84 return "connection to the underlying data storage not yet available";
85 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
86 return "invalid namespace identifier passed to SDL API";
87 case AsyncRedisStorage::ErrorCode::END_MARKER:
88 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)");
89 return "unsupported error code for message()";
91 return "description missing for AsyncRedisStorageErrorCategory error: " + std::to_string(condition);
95 std::error_condition AsyncRedisStorageErrorCategory::default_error_condition(int condition) const noexcept
97 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
99 case AsyncRedisStorage::ErrorCode::SUCCESS:
100 return InternalError::SUCCESS;
101 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
102 return InternalError::SDL_NOT_READY;
103 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
104 return InternalError::SDL_RECEIVED_INVALID_PARAMETER;
105 case AsyncRedisStorage::ErrorCode::END_MARKER:
106 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)");
107 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
109 std::ostringstream msg;
110 msg << "default error condition missing for AsyncRedisStorageErrorCategory error: "
112 logErrorOnce(msg.str());
113 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
117 AsyncStorage::DataMap buildDataMap(const AsyncStorage::Keys& keys, const Reply::ReplyVector& replyVector)
119 AsyncStorage::DataMap dataMap;
121 for (const auto& j : keys)
123 if (replyVector[i]->getType() == Reply::Type::STRING)
125 AsyncStorage::Data data;
126 auto dataStr(replyVector[i]->getString());
127 for (ReplyStringLength k(0); k < dataStr->len; ++k)
128 data.push_back(static_cast<uint8_t>(dataStr->str[static_cast<size_t>(k)]));
129 dataMap.insert({ j, data });
136 AsyncStorage::Key getKey(const Reply::DataItem& item)
138 std::string str(item.str.c_str(), static_cast<size_t>(item.len));
139 auto res(str.find(AsyncRedisStorage::SEPARATOR));
140 return str.substr(res + 1);
143 AsyncStorage::Keys getKeys(const Reply::ReplyVector& replyVector)
145 AsyncStorage::Keys keys;
146 for (const auto& i : replyVector)
148 if (i->getType() == Reply::Type::STRING)
149 keys.insert(getKey(*i->getString()));
154 void escapeRedisSearchPatternCharacters(std::string& stringToProcess)
156 const std::string redisSearchPatternCharacters = R"(*?[]\)";
158 std::size_t foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters);
160 while (foundPosition != std::string::npos)
162 stringToProcess.insert(foundPosition, R"(\)");
163 foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters, foundPosition + 2);
168 AsyncRedisStorage::ErrorCode& shareddatalayer::operator++ (AsyncRedisStorage::ErrorCode& ecEnum)
170 if (ecEnum == AsyncRedisStorage::ErrorCode::END_MARKER)
171 throw std::out_of_range("for AsyncRedisStorage::ErrorCode& operator ++");
172 ecEnum = AsyncRedisStorage::ErrorCode(static_cast<std::underlying_type<AsyncRedisStorage::ErrorCode>::type>(ecEnum) + 1);
176 std::error_code shareddatalayer::make_error_code(AsyncRedisStorage::ErrorCode errorCode)
178 return std::error_code(static_cast<int>(errorCode), AsyncRedisStorage::errorCategory());
181 const std::error_category& AsyncRedisStorage::errorCategory() noexcept
183 static const AsyncRedisStorageErrorCategory theAsyncRedisStorageErrorCategory;
184 return theAsyncRedisStorageErrorCategory;
187 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
188 std::shared_ptr<AsyncDatabaseDiscovery> discovery,
189 const boost::optional<PublisherId>& pId,
190 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
191 std::shared_ptr<Logger> logger):
192 AsyncRedisStorage(engine,
195 namespaceConfigurations,
196 ::asyncCommandDispatcherCreator,
197 std::make_shared<redis::ContentsBuilder>(SEPARATOR),
202 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
203 std::shared_ptr<redis::AsyncDatabaseDiscovery> discovery,
204 const boost::optional<PublisherId>& pId,
205 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
206 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
207 std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
208 std::shared_ptr<Logger> logger):
211 discovery(discovery),
213 asyncCommandDispatcherCreator(asyncCommandDispatcherCreator),
214 contentsBuilder(contentsBuilder),
215 namespaceConfigurations(namespaceConfigurations),
218 if(publisherId && (*publisherId).empty())
220 throw std::invalid_argument("AsyncRedisStorage: empty publisher ID string given");
223 discovery->setStateChangedCb([this](const redis::DatabaseInfo& databaseInfo)
225 serviceStateChanged(databaseInfo);
229 AsyncRedisStorage::~AsyncRedisStorage()
232 discovery->clearStateChangedCb();
234 dispatcher->disableCommandCallbacks();
237 redis::DatabaseInfo& AsyncRedisStorage::getDatabaseInfo()
242 void AsyncRedisStorage::serviceStateChanged(const redis::DatabaseInfo& newDatabaseInfo)
244 dispatcher = asyncCommandDispatcherCreator(*engine,
249 dispatcher->waitConnectedAsync([this]()
251 readyAck(std::error_code());
252 readyAck = ReadyAck();
254 dbInfo = newDatabaseInfo;
257 int AsyncRedisStorage::fd() const
262 void AsyncRedisStorage::handleEvents()
264 engine->handleEvents();
267 bool AsyncRedisStorage::canOperationBePerformed(const Namespace& ns,
268 boost::optional<bool> noKeysGiven,
269 std::error_code& ecToReturn)
271 if (!::isValidNamespace(ns))
273 logErrorOnce("Invalid namespace identifier: " + ns + " passed to SDL");
274 ecToReturn = std::error_code(ErrorCode::INVALID_NAMESPACE);
277 if (noKeysGiven && *noKeysGiven)
279 ecToReturn = std::error_code();
284 ecToReturn = std::error_code(ErrorCode::REDIS_NOT_YET_DISCOVERED);
288 ecToReturn = std::error_code();
292 void AsyncRedisStorage::waitReadyAsync(const Namespace&,
293 const ReadyAck& readyAck)
296 dispatcher->waitConnectedAsync([readyAck]()
298 readyAck(std::error_code());
301 this->readyAck = readyAck;
304 void AsyncRedisStorage::setAsync(const Namespace& ns,
305 const DataMap& dataMap,
306 const ModifyAck& modifyAck)
310 if (!canOperationBePerformed(ns, dataMap.empty(), ec))
312 engine->postCallback(std::bind(modifyAck, ec));
316 if (namespaceConfigurations->areNotificationsEnabled(ns))
317 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
319 std::placeholders::_1,
320 std::placeholders::_2,
323 contentsBuilder->build("MSETPUB", ns, dataMap, ns, getPublishMessage()));
325 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
327 std::placeholders::_1,
328 std::placeholders::_2,
331 contentsBuilder->build("MSET", ns, dataMap));
334 void AsyncRedisStorage::modificationCommandCallback(const std::error_code& error,
336 const ModifyAck& modifyAck )
341 void AsyncRedisStorage::conditionalCommandCallback(const std::error_code& error,
343 const ModifyIfAck& modifyIfAck)
345 auto type(reply.getType());
347 (type == Reply::Type::NIL) || // SETIE(PUB)
348 ((type == Reply::Type::INTEGER) && (reply.getInteger() != 1))) // SETNX(PUB) and DELIE(PUB)
349 modifyIfAck(error, false);
351 modifyIfAck(error, true);
354 void AsyncRedisStorage::setIfAsync(const Namespace& ns,
358 const ModifyIfAck& modifyIfAck)
362 if (!canOperationBePerformed(ns, boost::none, ec))
364 engine->postCallback(std::bind(modifyIfAck, ec, false));
368 if (namespaceConfigurations->areNotificationsEnabled(ns))
369 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
371 std::placeholders::_1,
372 std::placeholders::_2,
375 contentsBuilder->build("SETIEPUB", ns, key, newData, oldData, ns, getPublishMessage()));
377 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
379 std::placeholders::_1,
380 std::placeholders::_2,
383 contentsBuilder->build("SETIE", ns, key, newData, oldData));
386 void AsyncRedisStorage::removeIfAsync(const Namespace& ns,
389 const ModifyIfAck& modifyIfAck)
393 if (!canOperationBePerformed(ns, boost::none, ec))
395 engine->postCallback(std::bind(modifyIfAck, ec, false));
399 if (namespaceConfigurations->areNotificationsEnabled(ns))
400 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
402 std::placeholders::_1,
403 std::placeholders::_2,
406 contentsBuilder->build("DELIEPUB", ns, key, data, ns, getPublishMessage()));
408 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
410 std::placeholders::_1,
411 std::placeholders::_2,
414 contentsBuilder->build("DELIE", ns, key, data));
417 std::string AsyncRedisStorage::getPublishMessage() const
425 void AsyncRedisStorage::setIfNotExistsAsync(const Namespace& ns,
428 const ModifyIfAck& modifyIfAck)
432 if (!canOperationBePerformed(ns, boost::none, ec))
434 engine->postCallback(std::bind(modifyIfAck, ec, false));
438 if (namespaceConfigurations->areNotificationsEnabled(ns))
439 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
441 std::placeholders::_1,
442 std::placeholders::_2,
445 contentsBuilder->build("SETNXPUB", ns, key, data, ns ,getPublishMessage()));
447 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
449 std::placeholders::_1,
450 std::placeholders::_2,
453 contentsBuilder->build("SETNX", ns, key, data));
456 void AsyncRedisStorage::getAsync(const Namespace& ns,
458 const GetAck& getAck)
462 if (!canOperationBePerformed(ns, keys.empty(), ec))
464 engine->postCallback(std::bind(getAck, ec, DataMap()));
468 dispatcher->dispatchAsync([getAck, keys](const std::error_code& error,
472 getAck(error, DataMap());
474 getAck(std::error_code(), buildDataMap(keys, *reply.getArray()));
477 contentsBuilder->build("MGET", ns, keys));
480 void AsyncRedisStorage::removeAsync(const Namespace& ns,
482 const ModifyAck& modifyAck)
486 if (!canOperationBePerformed(ns, keys.empty(), ec))
488 engine->postCallback(std::bind(modifyAck, ec));
492 if (namespaceConfigurations->areNotificationsEnabled(ns))
493 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
495 std::placeholders::_1,
496 std::placeholders::_2,
499 contentsBuilder->build("DELPUB", ns, keys, ns, getPublishMessage()));
501 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
503 std::placeholders::_1,
504 std::placeholders::_2,
507 contentsBuilder->build("DEL", ns, keys));
510 void AsyncRedisStorage::findKeysAsync(const Namespace& ns,
511 const std::string& keyPrefix,
512 const FindKeysAck& findKeysAck)
514 //TODO: update to more optimal solution than current KEYS-based one.
517 if (!canOperationBePerformed(ns, boost::none, ec))
519 engine->postCallback(std::bind(findKeysAck, ec, Keys()));
523 dispatcher->dispatchAsync([findKeysAck](const std::error_code& error, const Reply& reply)
526 findKeysAck(error, Keys());
528 findKeysAck(std::error_code(), getKeys(*reply.getArray()));
531 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, keyPrefix)));
534 void AsyncRedisStorage::removeAllAsync(const Namespace& ns,
535 const ModifyAck& modifyAck)
539 if (!canOperationBePerformed(ns, boost::none, ec))
541 engine->postCallback(std::bind(modifyAck, ec));
545 dispatcher->dispatchAsync([this, modifyAck, ns](const std::error_code& error, const Reply& reply)
552 const auto& array(*reply.getArray());
554 modifyAck(std::error_code());
557 removeAsync(ns, getKeys(array), modifyAck);
561 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, "")));
564 std::string AsyncRedisStorage::buildKeyPrefixSearchPattern(const Namespace& ns, const std::string& keyPrefix) const
566 std::string escapedKeyPrefix = keyPrefix;
567 escapeRedisSearchPatternCharacters(escapedKeyPrefix);
568 std::ostringstream oss;
569 oss << '{' << ns << '}' << SEPARATOR << escapedKeyPrefix << "*";