From 8324d029ce006509ddbc605446d05987c17e0368 Mon Sep 17 00:00:00 2001 From: Rolf Badorek Date: Tue, 17 Sep 2019 16:47:20 +0300 Subject: [PATCH] Add Redis Sentinel based database discovery This is first step to support forthcoming Redis HA (Sentinel) DBaaS deployment. If sentinel-based database discovery is used (currently still disabled by configure option), current master is asked from Sentinel. In case that Sentinel can't be connected, re-try will be triggered after one second delay. If reply parsing fails, it is considered as non-recoverable bug and execution is aborted. Currently, Sentinel address and Redis master name are still hard coded, will be made configurable in a separate commit soon. Also ordering change notifications from Sentinel will be implemented separately. Added new discovery type "SENTINEL" to 'sdltool test-connectivity' command output. Refactoring for 'AsyncStorageImpl' class unit tests, so that those will use database discovery mock implementation. Earlier implementation did have assumptions for database discovery behavior, which were not fulfilled any more when sentinel database discovery is used. Added option to 'AsyncCommandDispatcher' which defines if commands will be sent to Redis or to Sentinel. In latter case existence checking for Redis module extension commands is skipped. Signed-off-by: Rolf Badorek Change-Id: Id7507844c9b74115e52d6f8eaf9cb18198c5dc63 --- Makefile.am | 9 + configure.ac | 5 + include/private/asyncstorageimpl.hpp | 10 +- include/private/redis/asynccommanddispatcher.hpp | 3 +- .../redis/asynchirediscommanddispatcher.hpp | 7 +- .../redis/asyncsentineldatabasediscovery.hpp | 85 +++++ include/private/redis/databaseinfo.hpp | 1 + src/asyncstorageimpl.cpp | 27 +- src/cli/testconnectivitycommand.cpp | 4 + src/redis/asynccommanddispatcher.cpp | 7 +- src/redis/asyncdatabasediscovery.cpp | 12 +- src/redis/asynchirediscommanddispatcher.cpp | 61 ++-- src/redis/asyncredisstorage.cpp | 3 +- src/redis/asyncsentineldatabasediscovery.cpp | 172 ++++++++++ tst/asynchirediscommanddispatcher_test.cpp | 46 ++- tst/asyncsentineldatabasediscovery_test.cpp | 347 +++++++++++++++++++++ tst/asyncstorageimpl_test.cpp | 24 +- 17 files changed, 776 insertions(+), 47 deletions(-) create mode 100644 include/private/redis/asyncsentineldatabasediscovery.hpp create mode 100644 src/redis/asyncsentineldatabasediscovery.cpp create mode 100644 tst/asyncsentineldatabasediscovery_test.cpp diff --git a/Makefile.am b/Makefile.am index c740aca..acd2cb6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -117,6 +117,11 @@ libsdl_la_SOURCES += \ src/redis/hiredisclusterepolladapter.cpp \ src/redis/hiredisclustersystem.cpp endif +if SENTINEL +libsdl_la_SOURCES += \ + include/private/redis/asyncsentineldatabasediscovery.hpp \ + src/redis/asyncsentineldatabasediscovery.cpp +endif libsdl_la_CPPFLAGS = \ $(BASE_CPPFLAGS) \ $(HIREDIS_CFLAGS) \ @@ -370,6 +375,10 @@ testrunner_SOURCES += \ tst/hiredisclusterepolladapter_test.cpp \ tst/hiredisclustersystem_test.cpp endif +if SENTINEL +testrunner_SOURCES += \ + tst/asyncsentineldatabasediscovery_test.cpp +endif testrunner_CPPFLAGS = \ $(BASE_CPPFLAGS) \ -I$(top_srcdir)/3rdparty/googletest/googlemock/include \ diff --git a/configure.ac b/configure.ac index 777845d..6155bde 100644 --- a/configure.ac +++ b/configure.ac @@ -61,6 +61,11 @@ AC_SUBST(SDL_CONF_DIR) AC_DEFINE(HAVE_REDIS, [1], [Have redis]) AM_CONDITIONAL([REDIS], [test xtrue]) + +# @TODO Change to true when Redis HA support is activated. +AC_DEFINE(HAVE_SENTINEL, [0], [Have sentinel]) +AM_CONDITIONAL([SENTINEL], [test "xyes" = "xno"]) + PKG_CHECK_MODULES([HIREDIS], [hiredis]) AC_DEFINE(HAVE_HIREDIS, [1], [Have hiredis]) AM_CONDITIONAL([HIREDIS], [test xtrue]) diff --git a/include/private/asyncstorageimpl.hpp b/include/private/asyncstorageimpl.hpp index fcd2e05..d6a90df 100644 --- a/include/private/asyncstorageimpl.hpp +++ b/include/private/asyncstorageimpl.hpp @@ -17,12 +17,14 @@ #ifndef SHAREDDATALAYER_REDIS_ASYNCSTORAGEIMPL_HPP_ #define SHAREDDATALAYER_REDIS_ASYNCSTORAGEIMPL_HPP_ +#include #include #include #include "private/configurationreader.hpp" #include "private/databaseconfigurationimpl.hpp" #include "private/logger.hpp" #include "private/namespaceconfigurationsimpl.hpp" +#include "private/redis/asyncdatabasediscovery.hpp" namespace shareddatalayer { @@ -31,6 +33,10 @@ namespace shareddatalayer class AsyncStorageImpl: public AsyncStorage { public: + using AsyncDatabaseDiscoveryCreator = std::function(std::shared_ptr engine, + const DatabaseConfiguration& databaseConfiguration, + std::shared_ptr logger)>; + AsyncStorageImpl(const AsyncStorageImpl&) = delete; AsyncStorageImpl& operator = (const AsyncStorageImpl&) = delete; @@ -48,7 +54,8 @@ namespace shareddatalayer const boost::optional& pId, std::shared_ptr databaseConfiguration, std::shared_ptr namespaceConfigurations, - std::shared_ptr logger); + std::shared_ptr logger, + const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator); int fd() const override; @@ -80,6 +87,7 @@ namespace shareddatalayer std::shared_ptr namespaceConfigurations; const boost::optional publisherId; std::shared_ptr logger; + AsyncDatabaseDiscoveryCreator asyncDatabaseDiscoveryCreator; AsyncStorage& getRedisHandler(); AsyncStorage& getDummyHandler(); diff --git a/include/private/redis/asynccommanddispatcher.hpp b/include/private/redis/asynccommanddispatcher.hpp index a94975e..13633b7 100644 --- a/include/private/redis/asynccommanddispatcher.hpp +++ b/include/private/redis/asynccommanddispatcher.hpp @@ -66,7 +66,8 @@ namespace shareddatalayer const DatabaseInfo& databaseInfo, std::shared_ptr contentsBuilder, bool usePermanentCommandCallbacks, - std::shared_ptr logger); + std::shared_ptr logger, + bool usedForSentinel); protected: AsyncCommandDispatcher() = default; diff --git a/include/private/redis/asynchirediscommanddispatcher.hpp b/include/private/redis/asynchirediscommanddispatcher.hpp index 7a466c9..f39d882 100644 --- a/include/private/redis/asynchirediscommanddispatcher.hpp +++ b/include/private/redis/asynchirediscommanddispatcher.hpp @@ -55,7 +55,8 @@ namespace shareddatalayer uint16_t port, std::shared_ptr contentsBuilder, bool usePermanentCommandCallbacks, - std::shared_ptr logger); + std::shared_ptr logger, + bool usedForSentinel); AsyncHiredisCommandDispatcher(Engine& engine, const std::string& address, @@ -64,7 +65,8 @@ namespace shareddatalayer bool usePermanentCommandCallbacks, HiredisSystem& hiredisSystem, std::shared_ptr adapter, - std::shared_ptr logger); + std::shared_ptr logger, + bool usedForSentinel); ~AsyncHiredisCommandDispatcher() override; @@ -120,6 +122,7 @@ namespace shareddatalayer Timer::Duration connectionRetryTimerDuration; Timer::Duration connectionVerificationRetryTimerDuration; std::shared_ptr logger; + bool usedForSentinel; void connect(); diff --git a/include/private/redis/asyncsentineldatabasediscovery.hpp b/include/private/redis/asyncsentineldatabasediscovery.hpp new file mode 100644 index 0000000..527fa4e --- /dev/null +++ b/include/private/redis/asyncsentineldatabasediscovery.hpp @@ -0,0 +1,85 @@ +/* + Copyright (c) 2018-2019 Nokia. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef SHAREDDATALAYER_REDIS_ASYNCSENTINELDATABASEDISCOVERY_HPP_ +#define SHAREDDATALAYER_REDIS_ASYNCSENTINELDATABASEDISCOVERY_HPP_ + +#include +#include +#include "private/redis/asyncdatabasediscovery.hpp" +#include "private/redis/databaseinfo.hpp" +#include "private/logger.hpp" +#include "private/timer.hpp" + +namespace shareddatalayer +{ + namespace redis + { + class AsyncCommandDispatcher; + struct Contents; + class ContentsBuilder; + class Reply; + } + + class Engine; + + namespace redis + { + class AsyncSentinelDatabaseDiscovery: public AsyncDatabaseDiscovery + { + public: + using AsyncCommandDispatcherCreator = std::function(Engine& engine, + const redis::DatabaseInfo& databaseInfo, + std::shared_ptr contentsBuilder, + std::shared_ptr logger)>; + + AsyncSentinelDatabaseDiscovery(const AsyncSentinelDatabaseDiscovery&) = delete; + + AsyncSentinelDatabaseDiscovery& operator = (const AsyncSentinelDatabaseDiscovery&) = delete; + + AsyncSentinelDatabaseDiscovery(std::shared_ptr engine, + std::shared_ptr logger); + + AsyncSentinelDatabaseDiscovery(std::shared_ptr engine, + std::shared_ptr logger, + const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator, + std::shared_ptr contentsBuilder); + + ~AsyncSentinelDatabaseDiscovery() override = default; + + void setStateChangedCb(const StateChangedCb& stateChangedCb) override; + + void clearStateChangedCb() override; + + void setConnected(bool state); + private: + std::shared_ptr engine; + std::shared_ptr logger; + StateChangedCb stateChangedCb; + DatabaseInfo databaseInfo; + std::shared_ptr dispatcher; + std::shared_ptr contentsBuilder; + Timer masterInquiryRetryTimer; + Timer::Duration masterInquiryRetryTimerDuration; + + void sendMasterInquiry(); + + void masterInquiryAck(const std::error_code& error, const Reply& reply); + }; + } +} + +#endif diff --git a/include/private/redis/databaseinfo.hpp b/include/private/redis/databaseinfo.hpp index 670a880..d4e6cce 100644 --- a/include/private/redis/databaseinfo.hpp +++ b/include/private/redis/databaseinfo.hpp @@ -38,6 +38,7 @@ namespace shareddatalayer enum class Discovery { HIREDIS, + SENTINEL }; DatabaseConfiguration::Addresses hosts; diff --git a/src/asyncstorageimpl.cpp b/src/asyncstorageimpl.cpp index b17fcbd..59d7b22 100644 --- a/src/asyncstorageimpl.cpp +++ b/src/asyncstorageimpl.cpp @@ -23,11 +23,24 @@ #include "private/engine.hpp" #include "private/logger.hpp" #if HAVE_REDIS -#include "private/redis/asyncdatabasediscovery.hpp" #include "private/redis/asyncredisstorage.hpp" #endif using namespace shareddatalayer; +using namespace shareddatalayer::redis; + +namespace +{ + std::shared_ptr asyncDatabaseDiscoveryCreator(std::shared_ptr engine, + const DatabaseConfiguration& databaseConfiguration, + std::shared_ptr logger) + { + return AsyncDatabaseDiscovery::create(engine, + boost::none, + databaseConfiguration, + logger); + } +} AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr engine, const boost::optional& pId, @@ -36,7 +49,8 @@ AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr engine, databaseConfiguration(std::make_shared()), namespaceConfigurations(std::make_shared()), publisherId(pId), - logger(logger) + logger(logger), + asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator) { ConfigurationReader configurationReader(logger); configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration)); @@ -48,12 +62,14 @@ AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr engine, const boost::optional& pId, std::shared_ptr databaseConfiguration, std::shared_ptr namespaceConfigurations, - std::shared_ptr logger): + std::shared_ptr logger, + const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator): engine(engine), databaseConfiguration(databaseConfiguration), namespaceConfigurations(namespaceConfigurations), publisherId(pId), - logger(logger) + logger(logger), + asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator) { } @@ -61,9 +77,8 @@ AsyncStorage& AsyncStorageImpl::getRedisHandler() { #if HAVE_REDIS static AsyncRedisStorage redisHandler{engine, - redis::AsyncDatabaseDiscovery::create( + asyncDatabaseDiscoveryCreator( engine, - boost::none, std::ref(*databaseConfiguration), logger), publisherId, diff --git a/src/cli/testconnectivitycommand.cpp b/src/cli/testconnectivitycommand.cpp index 7c4b2ad..87ac8c0 100644 --- a/src/cli/testconnectivitycommand.cpp +++ b/src/cli/testconnectivitycommand.cpp @@ -131,6 +131,10 @@ namespace out << "Discovery type:: HIREDIS" << std::endl; PrintStaticConfiguration(out); break; + case DatabaseInfo::Discovery::SENTINEL: + out << "Discovery type:: SENTINEL" << std::endl; + PrintStaticConfiguration(out); + break; } } diff --git a/src/redis/asynccommanddispatcher.cpp b/src/redis/asynccommanddispatcher.cpp index 25ec890..1a40f90 100644 --- a/src/redis/asynccommanddispatcher.cpp +++ b/src/redis/asynccommanddispatcher.cpp @@ -33,9 +33,11 @@ std::shared_ptr AsyncCommandDispatcher::create(Engine& e const DatabaseInfo& databaseInfo, std::shared_ptr contentsBuilder, bool usePermanentCommandCallbacks, - std::shared_ptr logger) + std::shared_ptr logger, + bool usedForSentinel) { #if HAVE_HIREDIS_VIP + static_cast(usedForSentinel); if (databaseInfo.type == DatabaseInfo::Type::CLUSTER) { return std::make_shared(engine, @@ -60,7 +62,8 @@ std::shared_ptr AsyncCommandDispatcher::create(Engine& e databaseInfo.hosts.at(0).getPort(), contentsBuilder, usePermanentCommandCallbacks, - logger); + logger, + usedForSentinel); #else SHAREDDATALAYER_ABORT("Not implemented."); #endif diff --git a/src/redis/asyncdatabasediscovery.cpp b/src/redis/asyncdatabasediscovery.cpp index 892e7cc..63f2f12 100644 --- a/src/redis/asyncdatabasediscovery.cpp +++ b/src/redis/asyncdatabasediscovery.cpp @@ -22,6 +22,9 @@ #if HAVE_HIREDIS #include "private/redis/asynchiredisdatabasediscovery.hpp" #endif +#if HAVE_SENTINEL +#include "private/redis/asyncsentineldatabasediscovery.hpp" +#endif #include "private/abort.hpp" using namespace shareddatalayer::redis; @@ -49,15 +52,22 @@ std::shared_ptr AsyncDatabaseDiscovery::create(std::shar SHAREDDATALAYER_ABORT("No Hiredis vip for Redis cluster configuration"); #endif else + { #if HAVE_HIREDIS +#if HAVE_SENTINEL + static_cast(ns); + return std::make_shared(engine, + logger); +#else return std::make_shared(engine, ns, DatabaseInfo::Type::SINGLE, staticAddresses, logger); +#endif #else static_cast(logger); SHAREDDATALAYER_ABORT("No Hiredis"); #endif + } } - diff --git a/src/redis/asynchirediscommanddispatcher.cpp b/src/redis/asynchirediscommanddispatcher.cpp index e4195a9..026da34 100644 --- a/src/redis/asynchirediscommanddispatcher.cpp +++ b/src/redis/asynchirediscommanddispatcher.cpp @@ -80,7 +80,8 @@ AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine, uint16_t port, std::shared_ptr contentsBuilder, bool usePermanentCommandCallbacks, - std::shared_ptr logger): + std::shared_ptr logger, + bool usedForSentinel): AsyncHiredisCommandDispatcher(engine, address, port, @@ -88,7 +89,8 @@ AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine, usePermanentCommandCallbacks, HiredisSystem::getHiredisSystem(), std::make_shared(engine), - logger) + logger, + usedForSentinel) { } @@ -99,7 +101,8 @@ AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine, bool usePermanentCommandCallbacks, HiredisSystem& hiredisSystem, std::shared_ptr adapter, - std::shared_ptr logger): + std::shared_ptr logger, + bool usedForSentinel): engine(engine), address(address), port(ntohs(port)), @@ -113,7 +116,8 @@ AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine, connectionRetryTimer(engine), connectionRetryTimerDuration(std::chrono::seconds(1)), connectionVerificationRetryTimerDuration(std::chrono::seconds(10)), - logger(logger) + logger(logger), + usedForSentinel(usedForSentinel) { connect(); @@ -140,28 +144,33 @@ void AsyncHiredisCommandDispatcher::connect() void AsyncHiredisCommandDispatcher::verifyConnection() { - /* When Redis has max amount of users, it will still accept new connections but will - * close them immediately. Therefore, we need to verify that just established connection - * really works. This prevents calling client readyAck callback for a connection that - * will be terminated immediately. - */ - /* Connection verification is now done by doing redis command list query. Because we anyway - * need to verify that Redis has required commands, we can now combine these two operations - * (command list query and connection verification). If either one of the functionalities - * is not needed in the future and it is removed, remember to still leave the other one. - */ - serviceState = ServiceState::CONNECTION_VERIFICATION; - /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if - * we are spontaneously connected to redis while timer is running. If connection verification - * fails, timer is armed again (normal handling in connection verification). - */ - connectionRetryTimer.disarm(); - dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply, - this, - std::placeholders::_1, - std::placeholders::_2), - contentsBuilder->build("COMMAND"), - false); + if (usedForSentinel) + setConnected(); + else + { + /* When Redis has max amount of users, it will still accept new connections but will + * close them immediately. Therefore, we need to verify that just established connection + * really works. This prevents calling client readyAck callback for a connection that + * will be terminated immediately. + */ + /* Connection verification is now done by doing redis command list query. Because we anyway + * need to verify that Redis has required commands, we can now combine these two operations + * (command list query and connection verification). If either one of the functionalities + * is not needed in the future and it is removed, remember to still leave the other one. + */ + serviceState = ServiceState::CONNECTION_VERIFICATION; + /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if + * we are spontaneously connected to redis while timer is running. If connection verification + * fails, timer is armed again (normal handling in connection verification). + */ + connectionRetryTimer.disarm(); + dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply, + this, + std::placeholders::_1, + std::placeholders::_2), + contentsBuilder->build("COMMAND"), + false); + } } void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error, diff --git a/src/redis/asyncredisstorage.cpp b/src/redis/asyncredisstorage.cpp index 22d4b2b..f6bf63d 100644 --- a/src/redis/asyncredisstorage.cpp +++ b/src/redis/asyncredisstorage.cpp @@ -53,7 +53,8 @@ namespace databaseInfo, contentsBuilder, false, - logger); + logger, + false); } class AsyncRedisStorageErrorCategory: public std::error_category diff --git a/src/redis/asyncsentineldatabasediscovery.cpp b/src/redis/asyncsentineldatabasediscovery.cpp new file mode 100644 index 0000000..05815e2 --- /dev/null +++ b/src/redis/asyncsentineldatabasediscovery.cpp @@ -0,0 +1,172 @@ +/* + Copyright (c) 2018-2019 Nokia. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include +#include +#include +#include +#include "private/abort.hpp" +#include "private/hostandport.hpp" +#include "private/redis/asyncsentineldatabasediscovery.hpp" +#include "private/redis/asynccommanddispatcher.hpp" +#include "private/redis/contents.hpp" +#include "private/redis/contentsbuilder.hpp" +#include "private/redis/reply.hpp" + +using namespace shareddatalayer; +using namespace shareddatalayer::redis; + +namespace +{ + std::shared_ptr asyncCommandDispatcherCreator(Engine& engine, + const DatabaseInfo& databaseInfo, + std::shared_ptr contentsBuilder, + std::shared_ptr logger); + + std::unique_ptr parseMasterInquiryReply(const Reply& reply, Logger& logger); +} + +AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr engine, + std::shared_ptr logger): + AsyncSentinelDatabaseDiscovery(engine, + logger, + ::asyncCommandDispatcherCreator, + std::make_shared(AsyncStorage::SEPARATOR)) +{ +} + +AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr engine, + std::shared_ptr logger, + const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator, + std::shared_ptr contentsBuilder): + engine(engine), + logger(logger), + // @TODO Make configurable. + databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}), + DatabaseInfo::Type::SINGLE, + boost::none, + DatabaseInfo::Discovery::SENTINEL})), + contentsBuilder(contentsBuilder), + masterInquiryRetryTimer(*engine), + masterInquiryRetryTimerDuration(std::chrono::seconds(1)) +{ + dispatcher = asyncCommandDispatcherCreator(*engine, + databaseInfo, + contentsBuilder, + logger); +} + +void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb) +{ + stateChangedCb = cb; + dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this)); +} + +void AsyncSentinelDatabaseDiscovery::clearStateChangedCb() +{ + stateChangedCb = nullptr; +} + +void AsyncSentinelDatabaseDiscovery::sendMasterInquiry() +{ + dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck, + this, + std::placeholders::_1, + std::placeholders::_2), + "dummyNamespace", // Not meaningful for SENTINEL commands + contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable +} + +void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error, + const Reply& reply) +{ + if (!error) + { + auto hostAndPort = parseMasterInquiryReply(reply, *logger); + if (hostAndPort) + { + auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}), + DatabaseInfo::Type::SINGLE, + boost::none, + DatabaseInfo::Discovery::SENTINEL})); + if (stateChangedCb) + stateChangedCb(databaseInfo); + } + else + SHAREDDATALAYER_ABORT("Master inquiry reply parsing error."); + } + else + { + masterInquiryRetryTimer.arm( + masterInquiryRetryTimerDuration, + std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this)); + } +} + +namespace +{ + std::shared_ptr asyncCommandDispatcherCreator(Engine& engine, + const DatabaseInfo& databaseInfo, + std::shared_ptr contentsBuilder, + std::shared_ptr logger) + { + return AsyncCommandDispatcher::create(engine, + databaseInfo, + contentsBuilder, + false, + logger, + true); + } + + std::unique_ptr parseMasterInquiryReply(const Reply& reply, Logger& logger) + { + auto replyType = reply.getType(); + if (replyType == Reply::Type::ARRAY) + { + auto& replyVector(*reply.getArray()); + auto hostElementType = replyVector[0]->getType(); + if (hostElementType == Reply::Type::STRING) + { + auto host(replyVector[0]->getString()->str); + auto portElementType = replyVector[1]->getType(); + if (portElementType == Reply::Type::STRING) + { + auto port(replyVector[1]->getString()->str); + try + { + return std::unique_ptr(new HostAndPort(host+":"+port, 0));; + } + catch (const std::exception& e) + { + logger.debug() << "Invalid host or port in master inquiry reply, host: " + << host << ", port: " << port + << ", exception: " << e.what() << std::endl; + } + } + else + logger.debug() << "Invalid port element type in master inquiry reply: " + << static_cast(portElementType) << std::endl; + } + else + logger.debug() << "Invalid host element type in master inquiry reply: " + << static_cast(hostElementType) << std::endl; + } + else + logger.debug() << "Invalid master inquiry reply type: " + << static_cast(replyType) << std::endl; + return nullptr; + } +} diff --git a/tst/asynchirediscommanddispatcher_test.cpp b/tst/asynchirediscommanddispatcher_test.cpp index 55e9ac7..05f42e9 100644 --- a/tst/asynchirediscommanddispatcher_test.cpp +++ b/tst/asynchirediscommanddispatcher_test.cpp @@ -280,7 +280,8 @@ namespace false, hiredisSystemMock, adapterMock, - logger)); + logger, + false)); } ~AsyncHiredisCommandDispatcherDisconnectedTest() @@ -305,7 +306,8 @@ namespace true, hiredisSystemMock, adapterMock, - logger)); + logger, + false)); } ~AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest() @@ -397,6 +399,33 @@ namespace }; using AsyncHiredisCommandDispatcherDeathTest = AsyncHiredisCommandDispatcherConnectedTest; + + class AsyncHiredisCommandDispatcherForSentinelTest: public AsyncHiredisCommandDispatcherBaseTest + { + public: + AsyncHiredisCommandDispatcherForSentinelTest() + { + InSequence dummy; + expectationsUntilConnect(); + expectAdapterAttach(); + expectRedisAsyncSetConnectCallback(); + expectRedisAsyncSetDisconnectCallback(); + dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock, + "host", + htons(6379U), + contentsBuilderMock, + true, + hiredisSystemMock, + adapterMock, + logger, + true)); + } + + ~AsyncHiredisCommandDispatcherForSentinelTest() + { + expectRedisAsyncFree(); + } + }; } TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, IsNotCopyable) @@ -439,7 +468,8 @@ TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ContextErrorInConnectArmsR false, hiredisSystemMock, adapterMock, - logger)); + logger, + false)); expectDisarmConnectionRetryTimer(); } @@ -457,7 +487,8 @@ TEST_F(AsyncHiredisCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetry false, hiredisSystemMock, adapterMock, - logger)); + logger, + false)); } TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, FailedCommandListQueryArmsRetryTimer) @@ -947,3 +978,10 @@ TEST_F(AsyncHiredisCommandDispatcherDeathTest, TooManyRepliesAborts) savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd); EXPECT_EXIT(savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), ""); } + +TEST_F(AsyncHiredisCommandDispatcherForSentinelTest, CommandListInquiryIsNotSent) +{ + EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(_, _, _, _, _, _)) + .Times(0); + connected(&ac, 0); +} diff --git a/tst/asyncsentineldatabasediscovery_test.cpp b/tst/asyncsentineldatabasediscovery_test.cpp new file mode 100644 index 0000000..dcf35c7 --- /dev/null +++ b/tst/asyncsentineldatabasediscovery_test.cpp @@ -0,0 +1,347 @@ +/* + Copyright (c) 2018-2019 Nokia. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include +#include +#include +#include "private/createlogger.hpp" +#include "private/hostandport.hpp" +#include "private/timer.hpp" +#include "private/redis/asyncsentineldatabasediscovery.hpp" +#include "private/tst/asynccommanddispatchermock.hpp" +#include "private/tst/contentsbuildermock.hpp" +#include "private/tst/enginemock.hpp" +#include "private/tst/replymock.hpp" +#include "private/tst/wellknownerrorcode.hpp" + +using namespace shareddatalayer; +using namespace shareddatalayer::redis; +using namespace shareddatalayer::tst; +using namespace testing; + +namespace +{ + class AsyncSentinelDatabaseDiscoveryBaseTest: public testing::Test + { + public: + std::unique_ptr asyncSentinelDatabaseDiscovery; + std::shared_ptr> engineMock; + std::shared_ptr> dispatcherMock; + std::shared_ptr> contentsBuilderMock; + std::shared_ptr logger; + Contents contents; + AsyncCommandDispatcher::ConnectAck dispatcherConnectAck; + AsyncCommandDispatcher::CommandCb savedCommandCb; + ReplyMock replyMock; + std::string someHost; + uint16_t somePort; + Reply::DataItem hostDataItem; + Reply::DataItem portDataItem; + std::shared_ptr masterInquiryReplyHost; + std::shared_ptr masterInquiryReplyPort; + Reply::ReplyVector masterInquiryReply; + Timer::Duration expectedMasterInquiryRetryTimerDuration; + Timer::Callback savedConnectionRetryTimerCallback; + + AsyncSentinelDatabaseDiscoveryBaseTest(): + engineMock(std::make_shared>()), + dispatcherMock(std::make_shared>()), + contentsBuilderMock(std::make_shared>(AsyncStorage::SEPARATOR)), + logger(createLogger(SDL_LOG_PREFIX)), + contents({{"aaa","bbb"},{3,3}}), + someHost("somehost"), + somePort(1234), + hostDataItem({someHost,ReplyStringLength(someHost.length())}), + portDataItem({std::to_string(somePort),ReplyStringLength(std::to_string(somePort).length())}), + masterInquiryReplyHost(std::make_shared()), + masterInquiryReplyPort(std::make_shared()), + expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1)) + { + masterInquiryReply.push_back(masterInquiryReplyHost); + masterInquiryReply.push_back(masterInquiryReplyPort); + } + + virtual ~AsyncSentinelDatabaseDiscoveryBaseTest() + { + } + + std::shared_ptr asyncCommandDispatcherCreator(Engine&, + const DatabaseInfo&, + std::shared_ptr) + { + // @TODO Add database info checking when configuration support for sentinel is added. + newDispatcherCreated(); + return dispatcherMock; + } + + MOCK_METHOD0(newDispatcherCreated, void()); + + void expectNewDispatcherCreated() + { + EXPECT_CALL(*this, newDispatcherCreated()) + .Times(1); + } + + void expectDispatcherWaitConnectedAsync() + { + EXPECT_CALL(*dispatcherMock, waitConnectedAsync(_)) + .Times(1) + .WillOnce(Invoke([this](const AsyncCommandDispatcher::ConnectAck& connectAck) + { + dispatcherConnectAck = connectAck; + })); + } + + void expectContentsBuild(const std::string& string, + const std::string& string2, + const std::string& string3) + { + EXPECT_CALL(*contentsBuilderMock, build(string, string2, string3)) + .Times(1) + .WillOnce(Return(contents)); + } + + void expectDispatchAsync() + { + EXPECT_CALL(*dispatcherMock, dispatchAsync(_, _, contents)) + .Times(1) + .WillOnce(SaveArg<0>(&savedCommandCb)); + } + + void expectMasterInquiry() + { + expectContentsBuild("SENTINEL", "get-master-addr-by-name", "mymaster"); + expectDispatchAsync(); + } + + MOCK_METHOD1(stateChangedCb, void(const DatabaseInfo&)); + + void expectStateChangedCb() + { + EXPECT_CALL(*this, stateChangedCb(_)) + .Times(1) + .WillOnce(Invoke([this](const DatabaseInfo& databaseInfo) + { + EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(someHost, htons(somePort)) }), + ContainerEq(databaseInfo.hosts)); + EXPECT_EQ(DatabaseInfo::Type::SINGLE, databaseInfo.type); + EXPECT_EQ(boost::none, databaseInfo.ns); + EXPECT_EQ(DatabaseInfo::Discovery::SENTINEL, databaseInfo.discovery); + })); + } + + void expectGetReplyType(ReplyMock& mock, const Reply::Type& type) + { + EXPECT_CALL(mock, getType()) + .Times(1) + .WillOnce(Return(type)); + } + + void expectGetReplyArray_ReturnMasterInquiryReply() + { + EXPECT_CALL(replyMock, getArray()) + .Times(1) + .WillOnce(Return(&masterInquiryReply)); + } + + void expectGetReplyString(ReplyMock& mock, const Reply::DataItem& item) + { + EXPECT_CALL(mock, getString()) + .Times(1) + .WillOnce(Return(&item)); + } + + void expectMasterIquiryReply() + { + expectGetReplyType(replyMock, Reply::Type::ARRAY); + expectGetReplyArray_ReturnMasterInquiryReply(); + expectGetReplyType(*masterInquiryReplyHost, Reply::Type::STRING); + expectGetReplyString(*masterInquiryReplyHost, hostDataItem); + expectGetReplyType(*masterInquiryReplyPort, Reply::Type::STRING); + expectGetReplyString(*masterInquiryReplyPort, portDataItem); + } + + void expectMasterInquiryRetryTimer() + { + EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _)) + .Times(1) + .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback)); + } + + void setDefaultResponsesForMasterInquiryReplyParsing() + { + ON_CALL(replyMock, getType()) + .WillByDefault(Return(Reply::Type::ARRAY)); + ON_CALL(replyMock, getArray()) + .WillByDefault(Return(&masterInquiryReply)); + ON_CALL(*masterInquiryReplyHost, getType()) + .WillByDefault(Return(Reply::Type::STRING)); + ON_CALL(*masterInquiryReplyHost, getString()) + .WillByDefault(Return(&hostDataItem)); + ON_CALL(*masterInquiryReplyPort, getType()) + .WillByDefault(Return(Reply::Type::STRING)); + ON_CALL(*masterInquiryReplyHost, getString()) + .WillByDefault(Return(&portDataItem)); + } + }; + + class AsyncSentinelDatabaseDiscoveryTest: public AsyncSentinelDatabaseDiscoveryBaseTest + { + public: + AsyncSentinelDatabaseDiscoveryTest() + { + expectNewDispatcherCreated(); + asyncSentinelDatabaseDiscovery.reset( + new AsyncSentinelDatabaseDiscovery( + engineMock, + logger, + std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::asyncCommandDispatcherCreator, + this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3), + contentsBuilderMock)); + } + }; + + using AsyncSentinelDatabaseDiscoveryDeathTest = AsyncSentinelDatabaseDiscoveryTest; +} + +TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, IsNotCopyable) +{ + InSequence dummy; + EXPECT_FALSE(std::is_copy_constructible::value); + EXPECT_FALSE(std::is_copy_assignable::value); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, ImplementsAsyncDatabaseDiscovery) +{ + InSequence dummy; + EXPECT_TRUE((std::is_base_of::value)); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterIsInquiredFromSentinel) +{ + InSequence dummy; + expectDispatcherWaitConnectedAsync(); + asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, + this, + std::placeholders::_1)); + expectMasterInquiry(); + dispatcherConnectAck(); + expectMasterIquiryReply(); + expectStateChangedCb(); + savedCommandCb(std::error_code(), replyMock); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterInquiryErrorTriggersRetry) +{ + InSequence dummy; + expectDispatcherWaitConnectedAsync(); + asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, + this, + std::placeholders::_1)); + expectMasterInquiry(); + dispatcherConnectAck(); + expectMasterInquiryRetryTimer(); + savedCommandCb(getWellKnownErrorCode(), replyMock); + expectMasterInquiry(); + savedConnectionRetryTimerCallback(); + expectMasterIquiryReply(); + expectStateChangedCb(); + savedCommandCb(std::error_code(), replyMock); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidReplyType) +{ + InSequence dummy; + expectDispatcherWaitConnectedAsync(); + asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, + this, + std::placeholders::_1)); + expectMasterInquiry(); + dispatcherConnectAck(); + ON_CALL(replyMock, getType()) + .WillByDefault(Return(Reply::Type::NIL)); + EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidHostElementType) +{ + InSequence dummy; + expectDispatcherWaitConnectedAsync(); + asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, + this, + std::placeholders::_1)); + expectMasterInquiry(); + dispatcherConnectAck(); + setDefaultResponsesForMasterInquiryReplyParsing(); + ON_CALL(*masterInquiryReplyHost, getType()) + .WillByDefault(Return(Reply::Type::NIL)); + EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidPortElementType) +{ + InSequence dummy; + expectDispatcherWaitConnectedAsync(); + asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, + this, + std::placeholders::_1)); + expectMasterInquiry(); + dispatcherConnectAck(); + setDefaultResponsesForMasterInquiryReplyParsing(); + ON_CALL(*masterInquiryReplyPort, getType()) + .WillByDefault(Return(Reply::Type::NIL)); + EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_PortCantBeCastedToInt) +{ + InSequence dummy; + expectDispatcherWaitConnectedAsync(); + asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, + this, + std::placeholders::_1)); + expectMasterInquiry(); + dispatcherConnectAck(); + setDefaultResponsesForMasterInquiryReplyParsing(); + std::string invalidPort("invalidPort"); + Reply::DataItem invalidPortDataItem({invalidPort,ReplyStringLength(invalidPort.length())}); + ON_CALL(*masterInquiryReplyPort, getString()) + .WillByDefault(Return(&invalidPortDataItem)); + EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryTest, CallbackIsNotCalledAfterCleared) +{ + InSequence dummy; + expectDispatcherWaitConnectedAsync(); + asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, + this, + std::placeholders::_1)); + expectMasterInquiry(); + dispatcherConnectAck(); + expectMasterInquiryRetryTimer(); + savedCommandCb(getWellKnownErrorCode(), replyMock); + expectMasterInquiry(); + savedConnectionRetryTimerCallback(); + expectMasterIquiryReply(); + asyncSentinelDatabaseDiscovery->clearStateChangedCb(); + EXPECT_CALL(*this, stateChangedCb(_)) + .Times(0); + savedCommandCb(std::error_code(), replyMock); +} diff --git a/tst/asyncstorageimpl_test.cpp b/tst/asyncstorageimpl_test.cpp index b6a2319..a43c997 100644 --- a/tst/asyncstorageimpl_test.cpp +++ b/tst/asyncstorageimpl_test.cpp @@ -16,12 +16,14 @@ #include #include +#include "config.h" #include "private/asyncdummystorage.hpp" #include "private/asyncstorageimpl.hpp" #include "private/createlogger.hpp" #include "private/logger.hpp" #include "private/redis/asyncredisstorage.hpp" #include "private/tst/enginemock.hpp" +#include "private/tst/asyncdatabasediscoverymock.hpp" #include "private/tst/databaseconfigurationmock.hpp" #include "private/tst/namespaceconfigurationsmock.hpp" @@ -37,6 +39,7 @@ namespace std::shared_ptr> engineMock; std::shared_ptr dummyDatabaseConfiguration; std::shared_ptr> namespaceConfigurationsMock; + std::shared_ptr> discoveryMock; int fd; AsyncStorage::Namespace ns; Engine::Callback storedCallback; @@ -47,13 +50,30 @@ namespace engineMock(std::make_shared>()), dummyDatabaseConfiguration(std::make_shared()), namespaceConfigurationsMock(std::make_shared>()), + discoveryMock(std::make_shared>()), fd(10), ns("someKnownNamespace"), logger(createLogger(SDL_LOG_PREFIX)) { dummyDatabaseConfiguration->checkAndApplyDbType("redis-standalone"); dummyDatabaseConfiguration->checkAndApplyServerAddress("dummydatabaseaddress.local"); - asyncStorageImpl.reset(new AsyncStorageImpl(engineMock, boost::none, dummyDatabaseConfiguration, namespaceConfigurationsMock, logger)); + asyncStorageImpl.reset(new AsyncStorageImpl(engineMock, + boost::none, + dummyDatabaseConfiguration, + namespaceConfigurationsMock, + logger, + std::bind(&AsyncStorageImplTest::asyncDatabaseDiscoveryCreator, + this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3))); + } + + std::shared_ptr asyncDatabaseDiscoveryCreator(std::shared_ptr, + const DatabaseConfiguration&, + std::shared_ptr) + { + return discoveryMock; } void expectNamespaceConfigurationIsDbBackendUseEnabled_returnFalse() @@ -108,8 +128,6 @@ TEST_F(AsyncStorageImplTest, CorrectHandlerIsUsedBasedOnConfiguration) { InSequence dummy; expectNamespaceConfigurationIsDbBackendUseEnabled_returnTrue(); - //AsyncRedisStorage creation causes AsyncHiredisDatabaseDiscovery to post stateChanged callback - expectPostCallback(); AsyncStorage& returnedHandler1 = asyncStorageImpl->getOperationHandler(ns); EXPECT_EQ(typeid(AsyncRedisStorage&), typeid(returnedHandler1)); -- 2.16.6