src/redis/hiredisclusterepolladapter.cpp \
src/redis/hiredisclustersystem.cpp
endif
+if SENTINEL
+libsdl_la_SOURCES += \
+ include/private/redis/asyncsentineldatabasediscovery.hpp \
+ src/redis/asyncsentineldatabasediscovery.cpp
+endif
libsdl_la_CPPFLAGS = \
$(BASE_CPPFLAGS) \
$(HIREDIS_CFLAGS) \
tst/hiredisclusterepolladapter_test.cpp \
tst/hiredisclustersystem_test.cpp
endif
+if SENTINEL
+testrunner_SOURCES += \
+ tst/asyncsentineldatabasediscovery_test.cpp
+endif
testrunner_CPPFLAGS = \
$(BASE_CPPFLAGS) \
-I$(top_srcdir)/3rdparty/googletest/googlemock/include \
AC_DEFINE(HAVE_REDIS, [1], [Have redis])
AM_CONDITIONAL([REDIS], [test xtrue])
+
+# @TODO Change to true when Redis HA support is activated.
+AC_DEFINE(HAVE_SENTINEL, [0], [Have sentinel])
+AM_CONDITIONAL([SENTINEL], [test "xyes" = "xno"])
+
PKG_CHECK_MODULES([HIREDIS], [hiredis])
AC_DEFINE(HAVE_HIREDIS, [1], [Have hiredis])
AM_CONDITIONAL([HIREDIS], [test xtrue])
#ifndef SHAREDDATALAYER_REDIS_ASYNCSTORAGEIMPL_HPP_
#define SHAREDDATALAYER_REDIS_ASYNCSTORAGEIMPL_HPP_
+#include <functional>
#include <sdl/asyncstorage.hpp>
#include <sdl/publisherid.hpp>
#include "private/configurationreader.hpp"
#include "private/databaseconfigurationimpl.hpp"
#include "private/logger.hpp"
#include "private/namespaceconfigurationsimpl.hpp"
+#include "private/redis/asyncdatabasediscovery.hpp"
namespace shareddatalayer
{
class AsyncStorageImpl: public AsyncStorage
{
public:
+ using AsyncDatabaseDiscoveryCreator = std::function<std::shared_ptr<redis::AsyncDatabaseDiscovery>(std::shared_ptr<Engine> engine,
+ const DatabaseConfiguration& databaseConfiguration,
+ std::shared_ptr<Logger> logger)>;
+
AsyncStorageImpl(const AsyncStorageImpl&) = delete;
AsyncStorageImpl& operator = (const AsyncStorageImpl&) = delete;
const boost::optional<PublisherId>& pId,
std::shared_ptr<DatabaseConfiguration> databaseConfiguration,
std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
- std::shared_ptr<Logger> logger);
+ std::shared_ptr<Logger> logger,
+ const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator);
int fd() const override;
std::shared_ptr<NamespaceConfigurations> namespaceConfigurations;
const boost::optional<PublisherId> publisherId;
std::shared_ptr<Logger> logger;
+ AsyncDatabaseDiscoveryCreator asyncDatabaseDiscoveryCreator;
AsyncStorage& getRedisHandler();
AsyncStorage& getDummyHandler();
const DatabaseInfo& databaseInfo,
std::shared_ptr<ContentsBuilder> contentsBuilder,
bool usePermanentCommandCallbacks,
- std::shared_ptr<Logger> logger);
+ std::shared_ptr<Logger> logger,
+ bool usedForSentinel);
protected:
AsyncCommandDispatcher() = default;
uint16_t port,
std::shared_ptr<ContentsBuilder> contentsBuilder,
bool usePermanentCommandCallbacks,
- std::shared_ptr<Logger> logger);
+ std::shared_ptr<Logger> logger,
+ bool usedForSentinel);
AsyncHiredisCommandDispatcher(Engine& engine,
const std::string& address,
bool usePermanentCommandCallbacks,
HiredisSystem& hiredisSystem,
std::shared_ptr<HiredisEpollAdapter> adapter,
- std::shared_ptr<Logger> logger);
+ std::shared_ptr<Logger> logger,
+ bool usedForSentinel);
~AsyncHiredisCommandDispatcher() override;
Timer::Duration connectionRetryTimerDuration;
Timer::Duration connectionVerificationRetryTimerDuration;
std::shared_ptr<Logger> logger;
+ bool usedForSentinel;
void connect();
--- /dev/null
+/*
+ Copyright (c) 2018-2019 Nokia.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#ifndef SHAREDDATALAYER_REDIS_ASYNCSENTINELDATABASEDISCOVERY_HPP_
+#define SHAREDDATALAYER_REDIS_ASYNCSENTINELDATABASEDISCOVERY_HPP_
+
+#include <functional>
+#include <system_error>
+#include "private/redis/asyncdatabasediscovery.hpp"
+#include "private/redis/databaseinfo.hpp"
+#include "private/logger.hpp"
+#include "private/timer.hpp"
+
+namespace shareddatalayer
+{
+ namespace redis
+ {
+ class AsyncCommandDispatcher;
+ struct Contents;
+ class ContentsBuilder;
+ class Reply;
+ }
+
+ class Engine;
+
+ namespace redis
+ {
+ class AsyncSentinelDatabaseDiscovery: public AsyncDatabaseDiscovery
+ {
+ public:
+ using AsyncCommandDispatcherCreator = std::function<std::shared_ptr<redis::AsyncCommandDispatcher>(Engine& engine,
+ const redis::DatabaseInfo& databaseInfo,
+ std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
+ std::shared_ptr<Logger> logger)>;
+
+ AsyncSentinelDatabaseDiscovery(const AsyncSentinelDatabaseDiscovery&) = delete;
+
+ AsyncSentinelDatabaseDiscovery& operator = (const AsyncSentinelDatabaseDiscovery&) = delete;
+
+ AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
+ std::shared_ptr<Logger> logger);
+
+ AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
+ std::shared_ptr<Logger> logger,
+ const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
+ std::shared_ptr<redis::ContentsBuilder> contentsBuilder);
+
+ ~AsyncSentinelDatabaseDiscovery() override = default;
+
+ void setStateChangedCb(const StateChangedCb& stateChangedCb) override;
+
+ void clearStateChangedCb() override;
+
+ void setConnected(bool state);
+ private:
+ std::shared_ptr<Engine> engine;
+ std::shared_ptr<Logger> logger;
+ StateChangedCb stateChangedCb;
+ DatabaseInfo databaseInfo;
+ std::shared_ptr<redis::AsyncCommandDispatcher> dispatcher;
+ std::shared_ptr<redis::ContentsBuilder> contentsBuilder;
+ Timer masterInquiryRetryTimer;
+ Timer::Duration masterInquiryRetryTimerDuration;
+
+ void sendMasterInquiry();
+
+ void masterInquiryAck(const std::error_code& error, const Reply& reply);
+ };
+ }
+}
+
+#endif
enum class Discovery
{
HIREDIS,
+ SENTINEL
};
DatabaseConfiguration::Addresses hosts;
#include "private/engine.hpp"
#include "private/logger.hpp"
#if HAVE_REDIS
-#include "private/redis/asyncdatabasediscovery.hpp"
#include "private/redis/asyncredisstorage.hpp"
#endif
using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+ std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
+ const DatabaseConfiguration& databaseConfiguration,
+ std::shared_ptr<Logger> logger)
+ {
+ return AsyncDatabaseDiscovery::create(engine,
+ boost::none,
+ databaseConfiguration,
+ logger);
+ }
+}
AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
const boost::optional<PublisherId>& pId,
databaseConfiguration(std::make_shared<DatabaseConfigurationImpl>()),
namespaceConfigurations(std::make_shared<NamespaceConfigurationsImpl>()),
publisherId(pId),
- logger(logger)
+ logger(logger),
+ asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator)
{
ConfigurationReader configurationReader(logger);
configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration));
const boost::optional<PublisherId>& pId,
std::shared_ptr<DatabaseConfiguration> databaseConfiguration,
std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
- std::shared_ptr<Logger> logger):
+ std::shared_ptr<Logger> logger,
+ const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator):
engine(engine),
databaseConfiguration(databaseConfiguration),
namespaceConfigurations(namespaceConfigurations),
publisherId(pId),
- logger(logger)
+ logger(logger),
+ asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator)
{
}
{
#if HAVE_REDIS
static AsyncRedisStorage redisHandler{engine,
- redis::AsyncDatabaseDiscovery::create(
+ asyncDatabaseDiscoveryCreator(
engine,
- boost::none,
std::ref(*databaseConfiguration),
logger),
publisherId,
out << "Discovery type:: HIREDIS" << std::endl;
PrintStaticConfiguration(out);
break;
+ case DatabaseInfo::Discovery::SENTINEL:
+ out << "Discovery type:: SENTINEL" << std::endl;
+ PrintStaticConfiguration(out);
+ break;
}
}
const DatabaseInfo& databaseInfo,
std::shared_ptr<ContentsBuilder> contentsBuilder,
bool usePermanentCommandCallbacks,
- std::shared_ptr<Logger> logger)
+ std::shared_ptr<Logger> logger,
+ bool usedForSentinel)
{
#if HAVE_HIREDIS_VIP
+ static_cast<void>(usedForSentinel);
if (databaseInfo.type == DatabaseInfo::Type::CLUSTER)
{
return std::make_shared<AsyncHiredisClusterCommandDispatcher>(engine,
databaseInfo.hosts.at(0).getPort(),
contentsBuilder,
usePermanentCommandCallbacks,
- logger);
+ logger,
+ usedForSentinel);
#else
SHAREDDATALAYER_ABORT("Not implemented.");
#endif
#if HAVE_HIREDIS
#include "private/redis/asynchiredisdatabasediscovery.hpp"
#endif
+#if HAVE_SENTINEL
+#include "private/redis/asyncsentineldatabasediscovery.hpp"
+#endif
#include "private/abort.hpp"
using namespace shareddatalayer::redis;
SHAREDDATALAYER_ABORT("No Hiredis vip for Redis cluster configuration");
#endif
else
+ {
#if HAVE_HIREDIS
+#if HAVE_SENTINEL
+ static_cast<void>(ns);
+ return std::make_shared<AsyncSentinelDatabaseDiscovery>(engine,
+ logger);
+#else
return std::make_shared<AsyncHiredisDatabaseDiscovery>(engine,
ns,
DatabaseInfo::Type::SINGLE,
staticAddresses,
logger);
+#endif
#else
static_cast<void>(logger);
SHAREDDATALAYER_ABORT("No Hiredis");
#endif
+ }
}
-
uint16_t port,
std::shared_ptr<ContentsBuilder> contentsBuilder,
bool usePermanentCommandCallbacks,
- std::shared_ptr<Logger> logger):
+ std::shared_ptr<Logger> logger,
+ bool usedForSentinel):
AsyncHiredisCommandDispatcher(engine,
address,
port,
usePermanentCommandCallbacks,
HiredisSystem::getHiredisSystem(),
std::make_shared<HiredisEpollAdapter>(engine),
- logger)
+ logger,
+ usedForSentinel)
{
}
bool usePermanentCommandCallbacks,
HiredisSystem& hiredisSystem,
std::shared_ptr<HiredisEpollAdapter> adapter,
- std::shared_ptr<Logger> logger):
+ std::shared_ptr<Logger> logger,
+ bool usedForSentinel):
engine(engine),
address(address),
port(ntohs(port)),
connectionRetryTimer(engine),
connectionRetryTimerDuration(std::chrono::seconds(1)),
connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
- logger(logger)
+ logger(logger),
+ usedForSentinel(usedForSentinel)
{
connect();
void AsyncHiredisCommandDispatcher::verifyConnection()
{
- /* When Redis has max amount of users, it will still accept new connections but will
- * close them immediately. Therefore, we need to verify that just established connection
- * really works. This prevents calling client readyAck callback for a connection that
- * will be terminated immediately.
- */
- /* Connection verification is now done by doing redis command list query. Because we anyway
- * need to verify that Redis has required commands, we can now combine these two operations
- * (command list query and connection verification). If either one of the functionalities
- * is not needed in the future and it is removed, remember to still leave the other one.
- */
- serviceState = ServiceState::CONNECTION_VERIFICATION;
- /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
- * we are spontaneously connected to redis while timer is running. If connection verification
- * fails, timer is armed again (normal handling in connection verification).
- */
- connectionRetryTimer.disarm();
- dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
- this,
- std::placeholders::_1,
- std::placeholders::_2),
- contentsBuilder->build("COMMAND"),
- false);
+ if (usedForSentinel)
+ setConnected();
+ else
+ {
+ /* When Redis has max amount of users, it will still accept new connections but will
+ * close them immediately. Therefore, we need to verify that just established connection
+ * really works. This prevents calling client readyAck callback for a connection that
+ * will be terminated immediately.
+ */
+ /* Connection verification is now done by doing redis command list query. Because we anyway
+ * need to verify that Redis has required commands, we can now combine these two operations
+ * (command list query and connection verification). If either one of the functionalities
+ * is not needed in the future and it is removed, remember to still leave the other one.
+ */
+ serviceState = ServiceState::CONNECTION_VERIFICATION;
+ /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
+ * we are spontaneously connected to redis while timer is running. If connection verification
+ * fails, timer is armed again (normal handling in connection verification).
+ */
+ connectionRetryTimer.disarm();
+ dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ contentsBuilder->build("COMMAND"),
+ false);
+ }
}
void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
databaseInfo,
contentsBuilder,
false,
- logger);
+ logger,
+ false);
}
class AsyncRedisStorageErrorCategory: public std::error_category
--- /dev/null
+/*
+ Copyright (c) 2018-2019 Nokia.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#include <arpa/inet.h>
+#include <iostream>
+#include <string>
+#include <sdl/asyncstorage.hpp>
+#include "private/abort.hpp"
+#include "private/hostandport.hpp"
+#include "private/redis/asyncsentineldatabasediscovery.hpp"
+#include "private/redis/asynccommanddispatcher.hpp"
+#include "private/redis/contents.hpp"
+#include "private/redis/contentsbuilder.hpp"
+#include "private/redis/reply.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+ std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
+ const DatabaseInfo& databaseInfo,
+ std::shared_ptr<ContentsBuilder> contentsBuilder,
+ std::shared_ptr<Logger> logger);
+
+ std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
+}
+
+AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
+ std::shared_ptr<Logger> logger):
+ AsyncSentinelDatabaseDiscovery(engine,
+ logger,
+ ::asyncCommandDispatcherCreator,
+ std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
+{
+}
+
+AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
+ std::shared_ptr<Logger> logger,
+ const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
+ std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
+ engine(engine),
+ logger(logger),
+ // @TODO Make configurable.
+ databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}),
+ DatabaseInfo::Type::SINGLE,
+ boost::none,
+ DatabaseInfo::Discovery::SENTINEL})),
+ contentsBuilder(contentsBuilder),
+ masterInquiryRetryTimer(*engine),
+ masterInquiryRetryTimerDuration(std::chrono::seconds(1))
+{
+ dispatcher = asyncCommandDispatcherCreator(*engine,
+ databaseInfo,
+ contentsBuilder,
+ logger);
+}
+
+void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
+{
+ stateChangedCb = cb;
+ dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+}
+
+void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
+{
+ stateChangedCb = nullptr;
+}
+
+void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
+{
+ dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ "dummyNamespace", // Not meaningful for SENTINEL commands
+ contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
+}
+
+void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
+ const Reply& reply)
+{
+ if (!error)
+ {
+ auto hostAndPort = parseMasterInquiryReply(reply, *logger);
+ if (hostAndPort)
+ {
+ auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
+ DatabaseInfo::Type::SINGLE,
+ boost::none,
+ DatabaseInfo::Discovery::SENTINEL}));
+ if (stateChangedCb)
+ stateChangedCb(databaseInfo);
+ }
+ else
+ SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
+ }
+ else
+ {
+ masterInquiryRetryTimer.arm(
+ masterInquiryRetryTimerDuration,
+ std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+ }
+}
+
+namespace
+{
+ std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
+ const DatabaseInfo& databaseInfo,
+ std::shared_ptr<ContentsBuilder> contentsBuilder,
+ std::shared_ptr<Logger> logger)
+ {
+ return AsyncCommandDispatcher::create(engine,
+ databaseInfo,
+ contentsBuilder,
+ false,
+ logger,
+ true);
+ }
+
+ std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
+ {
+ auto replyType = reply.getType();
+ if (replyType == Reply::Type::ARRAY)
+ {
+ auto& replyVector(*reply.getArray());
+ auto hostElementType = replyVector[0]->getType();
+ if (hostElementType == Reply::Type::STRING)
+ {
+ auto host(replyVector[0]->getString()->str);
+ auto portElementType = replyVector[1]->getType();
+ if (portElementType == Reply::Type::STRING)
+ {
+ auto port(replyVector[1]->getString()->str);
+ try
+ {
+ return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
+ }
+ catch (const std::exception& e)
+ {
+ logger.debug() << "Invalid host or port in master inquiry reply, host: "
+ << host << ", port: " << port
+ << ", exception: " << e.what() << std::endl;
+ }
+ }
+ else
+ logger.debug() << "Invalid port element type in master inquiry reply: "
+ << static_cast<int>(portElementType) << std::endl;
+ }
+ else
+ logger.debug() << "Invalid host element type in master inquiry reply: "
+ << static_cast<int>(hostElementType) << std::endl;
+ }
+ else
+ logger.debug() << "Invalid master inquiry reply type: "
+ << static_cast<int>(replyType) << std::endl;
+ return nullptr;
+ }
+}
false,
hiredisSystemMock,
adapterMock,
- logger));
+ logger,
+ false));
}
~AsyncHiredisCommandDispatcherDisconnectedTest()
true,
hiredisSystemMock,
adapterMock,
- logger));
+ logger,
+ false));
}
~AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest()
};
using AsyncHiredisCommandDispatcherDeathTest = AsyncHiredisCommandDispatcherConnectedTest;
+
+ class AsyncHiredisCommandDispatcherForSentinelTest: public AsyncHiredisCommandDispatcherBaseTest
+ {
+ public:
+ AsyncHiredisCommandDispatcherForSentinelTest()
+ {
+ InSequence dummy;
+ expectationsUntilConnect();
+ expectAdapterAttach();
+ expectRedisAsyncSetConnectCallback();
+ expectRedisAsyncSetDisconnectCallback();
+ dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
+ "host",
+ htons(6379U),
+ contentsBuilderMock,
+ true,
+ hiredisSystemMock,
+ adapterMock,
+ logger,
+ true));
+ }
+
+ ~AsyncHiredisCommandDispatcherForSentinelTest()
+ {
+ expectRedisAsyncFree();
+ }
+ };
}
TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, IsNotCopyable)
false,
hiredisSystemMock,
adapterMock,
- logger));
+ logger,
+ false));
expectDisarmConnectionRetryTimer();
}
false,
hiredisSystemMock,
adapterMock,
- logger));
+ logger,
+ false));
}
TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, FailedCommandListQueryArmsRetryTimer)
savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd);
EXPECT_EXIT(savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
}
+
+TEST_F(AsyncHiredisCommandDispatcherForSentinelTest, CommandListInquiryIsNotSent)
+{
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(_, _, _, _, _, _))
+ .Times(0);
+ connected(&ac, 0);
+}
--- /dev/null
+/*
+ Copyright (c) 2018-2019 Nokia.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#include <gtest/gtest.h>
+#include <arpa/inet.h>
+#include <sdl/asyncstorage.hpp>
+#include "private/createlogger.hpp"
+#include "private/hostandport.hpp"
+#include "private/timer.hpp"
+#include "private/redis/asyncsentineldatabasediscovery.hpp"
+#include "private/tst/asynccommanddispatchermock.hpp"
+#include "private/tst/contentsbuildermock.hpp"
+#include "private/tst/enginemock.hpp"
+#include "private/tst/replymock.hpp"
+#include "private/tst/wellknownerrorcode.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+using namespace shareddatalayer::tst;
+using namespace testing;
+
+namespace
+{
+ class AsyncSentinelDatabaseDiscoveryBaseTest: public testing::Test
+ {
+ public:
+ std::unique_ptr<AsyncSentinelDatabaseDiscovery> asyncSentinelDatabaseDiscovery;
+ std::shared_ptr<StrictMock<EngineMock>> engineMock;
+ std::shared_ptr<StrictMock<AsyncCommandDispatcherMock>> dispatcherMock;
+ std::shared_ptr<StrictMock<ContentsBuilderMock>> contentsBuilderMock;
+ std::shared_ptr<Logger> logger;
+ Contents contents;
+ AsyncCommandDispatcher::ConnectAck dispatcherConnectAck;
+ AsyncCommandDispatcher::CommandCb savedCommandCb;
+ ReplyMock replyMock;
+ std::string someHost;
+ uint16_t somePort;
+ Reply::DataItem hostDataItem;
+ Reply::DataItem portDataItem;
+ std::shared_ptr<ReplyMock> masterInquiryReplyHost;
+ std::shared_ptr<ReplyMock> masterInquiryReplyPort;
+ Reply::ReplyVector masterInquiryReply;
+ Timer::Duration expectedMasterInquiryRetryTimerDuration;
+ Timer::Callback savedConnectionRetryTimerCallback;
+
+ AsyncSentinelDatabaseDiscoveryBaseTest():
+ engineMock(std::make_shared<StrictMock<EngineMock>>()),
+ dispatcherMock(std::make_shared<StrictMock<AsyncCommandDispatcherMock>>()),
+ contentsBuilderMock(std::make_shared<StrictMock<ContentsBuilderMock>>(AsyncStorage::SEPARATOR)),
+ logger(createLogger(SDL_LOG_PREFIX)),
+ contents({{"aaa","bbb"},{3,3}}),
+ someHost("somehost"),
+ somePort(1234),
+ hostDataItem({someHost,ReplyStringLength(someHost.length())}),
+ portDataItem({std::to_string(somePort),ReplyStringLength(std::to_string(somePort).length())}),
+ masterInquiryReplyHost(std::make_shared<ReplyMock>()),
+ masterInquiryReplyPort(std::make_shared<ReplyMock>()),
+ expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1))
+ {
+ masterInquiryReply.push_back(masterInquiryReplyHost);
+ masterInquiryReply.push_back(masterInquiryReplyPort);
+ }
+
+ virtual ~AsyncSentinelDatabaseDiscoveryBaseTest()
+ {
+ }
+
+ std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine&,
+ const DatabaseInfo&,
+ std::shared_ptr<ContentsBuilder>)
+ {
+ // @TODO Add database info checking when configuration support for sentinel is added.
+ newDispatcherCreated();
+ return dispatcherMock;
+ }
+
+ MOCK_METHOD0(newDispatcherCreated, void());
+
+ void expectNewDispatcherCreated()
+ {
+ EXPECT_CALL(*this, newDispatcherCreated())
+ .Times(1);
+ }
+
+ void expectDispatcherWaitConnectedAsync()
+ {
+ EXPECT_CALL(*dispatcherMock, waitConnectedAsync(_))
+ .Times(1)
+ .WillOnce(Invoke([this](const AsyncCommandDispatcher::ConnectAck& connectAck)
+ {
+ dispatcherConnectAck = connectAck;
+ }));
+ }
+
+ void expectContentsBuild(const std::string& string,
+ const std::string& string2,
+ const std::string& string3)
+ {
+ EXPECT_CALL(*contentsBuilderMock, build(string, string2, string3))
+ .Times(1)
+ .WillOnce(Return(contents));
+ }
+
+ void expectDispatchAsync()
+ {
+ EXPECT_CALL(*dispatcherMock, dispatchAsync(_, _, contents))
+ .Times(1)
+ .WillOnce(SaveArg<0>(&savedCommandCb));
+ }
+
+ void expectMasterInquiry()
+ {
+ expectContentsBuild("SENTINEL", "get-master-addr-by-name", "mymaster");
+ expectDispatchAsync();
+ }
+
+ MOCK_METHOD1(stateChangedCb, void(const DatabaseInfo&));
+
+ void expectStateChangedCb()
+ {
+ EXPECT_CALL(*this, stateChangedCb(_))
+ .Times(1)
+ .WillOnce(Invoke([this](const DatabaseInfo& databaseInfo)
+ {
+ EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(someHost, htons(somePort)) }),
+ ContainerEq(databaseInfo.hosts));
+ EXPECT_EQ(DatabaseInfo::Type::SINGLE, databaseInfo.type);
+ EXPECT_EQ(boost::none, databaseInfo.ns);
+ EXPECT_EQ(DatabaseInfo::Discovery::SENTINEL, databaseInfo.discovery);
+ }));
+ }
+
+ void expectGetReplyType(ReplyMock& mock, const Reply::Type& type)
+ {
+ EXPECT_CALL(mock, getType())
+ .Times(1)
+ .WillOnce(Return(type));
+ }
+
+ void expectGetReplyArray_ReturnMasterInquiryReply()
+ {
+ EXPECT_CALL(replyMock, getArray())
+ .Times(1)
+ .WillOnce(Return(&masterInquiryReply));
+ }
+
+ void expectGetReplyString(ReplyMock& mock, const Reply::DataItem& item)
+ {
+ EXPECT_CALL(mock, getString())
+ .Times(1)
+ .WillOnce(Return(&item));
+ }
+
+ void expectMasterIquiryReply()
+ {
+ expectGetReplyType(replyMock, Reply::Type::ARRAY);
+ expectGetReplyArray_ReturnMasterInquiryReply();
+ expectGetReplyType(*masterInquiryReplyHost, Reply::Type::STRING);
+ expectGetReplyString(*masterInquiryReplyHost, hostDataItem);
+ expectGetReplyType(*masterInquiryReplyPort, Reply::Type::STRING);
+ expectGetReplyString(*masterInquiryReplyPort, portDataItem);
+ }
+
+ void expectMasterInquiryRetryTimer()
+ {
+ EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _))
+ .Times(1)
+ .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
+ }
+
+ void setDefaultResponsesForMasterInquiryReplyParsing()
+ {
+ ON_CALL(replyMock, getType())
+ .WillByDefault(Return(Reply::Type::ARRAY));
+ ON_CALL(replyMock, getArray())
+ .WillByDefault(Return(&masterInquiryReply));
+ ON_CALL(*masterInquiryReplyHost, getType())
+ .WillByDefault(Return(Reply::Type::STRING));
+ ON_CALL(*masterInquiryReplyHost, getString())
+ .WillByDefault(Return(&hostDataItem));
+ ON_CALL(*masterInquiryReplyPort, getType())
+ .WillByDefault(Return(Reply::Type::STRING));
+ ON_CALL(*masterInquiryReplyHost, getString())
+ .WillByDefault(Return(&portDataItem));
+ }
+ };
+
+ class AsyncSentinelDatabaseDiscoveryTest: public AsyncSentinelDatabaseDiscoveryBaseTest
+ {
+ public:
+ AsyncSentinelDatabaseDiscoveryTest()
+ {
+ expectNewDispatcherCreated();
+ asyncSentinelDatabaseDiscovery.reset(
+ new AsyncSentinelDatabaseDiscovery(
+ engineMock,
+ logger,
+ std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::asyncCommandDispatcherCreator,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2,
+ std::placeholders::_3),
+ contentsBuilderMock));
+ }
+ };
+
+ using AsyncSentinelDatabaseDiscoveryDeathTest = AsyncSentinelDatabaseDiscoveryTest;
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, IsNotCopyable)
+{
+ InSequence dummy;
+ EXPECT_FALSE(std::is_copy_constructible<AsyncSentinelDatabaseDiscovery>::value);
+ EXPECT_FALSE(std::is_copy_assignable<AsyncSentinelDatabaseDiscovery>::value);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, ImplementsAsyncDatabaseDiscovery)
+{
+ InSequence dummy;
+ EXPECT_TRUE((std::is_base_of<AsyncDatabaseDiscovery, AsyncSentinelDatabaseDiscovery>::value));
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterIsInquiredFromSentinel)
+{
+ InSequence dummy;
+ expectDispatcherWaitConnectedAsync();
+ asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
+ this,
+ std::placeholders::_1));
+ expectMasterInquiry();
+ dispatcherConnectAck();
+ expectMasterIquiryReply();
+ expectStateChangedCb();
+ savedCommandCb(std::error_code(), replyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterInquiryErrorTriggersRetry)
+{
+ InSequence dummy;
+ expectDispatcherWaitConnectedAsync();
+ asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
+ this,
+ std::placeholders::_1));
+ expectMasterInquiry();
+ dispatcherConnectAck();
+ expectMasterInquiryRetryTimer();
+ savedCommandCb(getWellKnownErrorCode(), replyMock);
+ expectMasterInquiry();
+ savedConnectionRetryTimerCallback();
+ expectMasterIquiryReply();
+ expectStateChangedCb();
+ savedCommandCb(std::error_code(), replyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidReplyType)
+{
+ InSequence dummy;
+ expectDispatcherWaitConnectedAsync();
+ asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
+ this,
+ std::placeholders::_1));
+ expectMasterInquiry();
+ dispatcherConnectAck();
+ ON_CALL(replyMock, getType())
+ .WillByDefault(Return(Reply::Type::NIL));
+ EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidHostElementType)
+{
+ InSequence dummy;
+ expectDispatcherWaitConnectedAsync();
+ asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
+ this,
+ std::placeholders::_1));
+ expectMasterInquiry();
+ dispatcherConnectAck();
+ setDefaultResponsesForMasterInquiryReplyParsing();
+ ON_CALL(*masterInquiryReplyHost, getType())
+ .WillByDefault(Return(Reply::Type::NIL));
+ EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidPortElementType)
+{
+ InSequence dummy;
+ expectDispatcherWaitConnectedAsync();
+ asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
+ this,
+ std::placeholders::_1));
+ expectMasterInquiry();
+ dispatcherConnectAck();
+ setDefaultResponsesForMasterInquiryReplyParsing();
+ ON_CALL(*masterInquiryReplyPort, getType())
+ .WillByDefault(Return(Reply::Type::NIL));
+ EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_PortCantBeCastedToInt)
+{
+ InSequence dummy;
+ expectDispatcherWaitConnectedAsync();
+ asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
+ this,
+ std::placeholders::_1));
+ expectMasterInquiry();
+ dispatcherConnectAck();
+ setDefaultResponsesForMasterInquiryReplyParsing();
+ std::string invalidPort("invalidPort");
+ Reply::DataItem invalidPortDataItem({invalidPort,ReplyStringLength(invalidPort.length())});
+ ON_CALL(*masterInquiryReplyPort, getString())
+ .WillByDefault(Return(&invalidPortDataItem));
+ EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, CallbackIsNotCalledAfterCleared)
+{
+ InSequence dummy;
+ expectDispatcherWaitConnectedAsync();
+ asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
+ this,
+ std::placeholders::_1));
+ expectMasterInquiry();
+ dispatcherConnectAck();
+ expectMasterInquiryRetryTimer();
+ savedCommandCb(getWellKnownErrorCode(), replyMock);
+ expectMasterInquiry();
+ savedConnectionRetryTimerCallback();
+ expectMasterIquiryReply();
+ asyncSentinelDatabaseDiscovery->clearStateChangedCb();
+ EXPECT_CALL(*this, stateChangedCb(_))
+ .Times(0);
+ savedCommandCb(std::error_code(), replyMock);
+}
#include <gtest/gtest.h>
#include <type_traits>
+#include "config.h"
#include "private/asyncdummystorage.hpp"
#include "private/asyncstorageimpl.hpp"
#include "private/createlogger.hpp"
#include "private/logger.hpp"
#include "private/redis/asyncredisstorage.hpp"
#include "private/tst/enginemock.hpp"
+#include "private/tst/asyncdatabasediscoverymock.hpp"
#include "private/tst/databaseconfigurationmock.hpp"
#include "private/tst/namespaceconfigurationsmock.hpp"
std::shared_ptr<StrictMock<EngineMock>> engineMock;
std::shared_ptr<DatabaseConfiguration> dummyDatabaseConfiguration;
std::shared_ptr<StrictMock<NamespaceConfigurationsMock>> namespaceConfigurationsMock;
+ std::shared_ptr<NiceMock<AsyncDatabaseDiscoveryMock>> discoveryMock;
int fd;
AsyncStorage::Namespace ns;
Engine::Callback storedCallback;
engineMock(std::make_shared<StrictMock<EngineMock>>()),
dummyDatabaseConfiguration(std::make_shared<DatabaseConfigurationImpl>()),
namespaceConfigurationsMock(std::make_shared<StrictMock<NamespaceConfigurationsMock>>()),
+ discoveryMock(std::make_shared<NiceMock<AsyncDatabaseDiscoveryMock>>()),
fd(10),
ns("someKnownNamespace"),
logger(createLogger(SDL_LOG_PREFIX))
{
dummyDatabaseConfiguration->checkAndApplyDbType("redis-standalone");
dummyDatabaseConfiguration->checkAndApplyServerAddress("dummydatabaseaddress.local");
- asyncStorageImpl.reset(new AsyncStorageImpl(engineMock, boost::none, dummyDatabaseConfiguration, namespaceConfigurationsMock, logger));
+ asyncStorageImpl.reset(new AsyncStorageImpl(engineMock,
+ boost::none,
+ dummyDatabaseConfiguration,
+ namespaceConfigurationsMock,
+ logger,
+ std::bind(&AsyncStorageImplTest::asyncDatabaseDiscoveryCreator,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2,
+ std::placeholders::_3)));
+ }
+
+ std::shared_ptr<redis::AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine>,
+ const DatabaseConfiguration&,
+ std::shared_ptr<Logger>)
+ {
+ return discoveryMock;
}
void expectNamespaceConfigurationIsDbBackendUseEnabled_returnFalse()
{
InSequence dummy;
expectNamespaceConfigurationIsDbBackendUseEnabled_returnTrue();
- //AsyncRedisStorage creation causes AsyncHiredisDatabaseDiscovery to post stateChanged callback
- expectPostCallback();
AsyncStorage& returnedHandler1 = asyncStorageImpl->getOperationHandler(ns);
EXPECT_EQ(typeid(AsyncRedisStorage&), typeid(returnedHandler1));