*/
#include <arpa/inet.h>
+#include <boost/algorithm/string.hpp>
#include <iostream>
#include <string>
+#include <vector>
#include <sdl/asyncstorage.hpp>
#include "private/abort.hpp"
#include "private/hostandport.hpp"
std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
const DatabaseInfo& databaseInfo,
std::shared_ptr<ContentsBuilder> contentsBuilder,
- std::shared_ptr<Logger> logger);
+ std::shared_ptr<Logger> logger,
+ bool usePermanentCommandCallbacks);
+
+ struct SubscribeReply
+ {
+ enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
+ Type type;
+ std::string message;
+
+ SubscribeReply(): type(Type::UNKNOWN) { }
+ };
+
+ std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
+
+ std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
}
AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
boost::none,
DatabaseInfo::Discovery::SENTINEL})),
contentsBuilder(contentsBuilder),
+ subscribeRetryTimer(*engine),
+ subscribeRetryTimerDuration(std::chrono::seconds(1)),
masterInquiryRetryTimer(*engine),
masterInquiryRetryTimerDuration(std::chrono::seconds(1))
{
+ subscriber = asyncCommandDispatcherCreator(*engine,
+ databaseInfo,
+ contentsBuilder,
+ logger,
+ true);
dispatcher = asyncCommandDispatcherCreator(*engine,
databaseInfo,
contentsBuilder,
- logger);
+ logger,
+ false);
+}
+
+AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
+{
+ if (subscriber)
+ subscriber->disableCommandCallbacks();
+ if (dispatcher)
+ dispatcher->disableCommandCallbacks();
+ stateChangedCb = nullptr;
}
void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
{
stateChangedCb = cb;
- dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+ subscriber->registerDisconnectCb([this]()
+ {
+ subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
+ });
+ subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
}
void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
stateChangedCb = nullptr;
}
+void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
+{
+ subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ "dummyNamespace", // Not meaningful for Sentinel
+ contentsBuilder->build("SUBSCRIBE", "+switch-master"));
+}
+
+void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
+ const Reply& reply)
+{
+ if (!error)
+ {
+ auto subscribeReply = parseSubscribeReply(reply, *logger);
+ if (subscribeReply)
+ {
+ switch (subscribeReply->type)
+ {
+ case (SubscribeReply::Type::SUBSCRIBE_REPLY):
+ {
+ dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+ break;
+ }
+ case (SubscribeReply::Type::NOTIFICATION):
+ {
+ auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
+ if (hostAndPort)
+ {
+ auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
+ DatabaseInfo::Type::SINGLE,
+ boost::none,
+ DatabaseInfo::Discovery::SENTINEL}));
+ if (stateChangedCb)
+ stateChangedCb(databaseInfo);
+ }
+ else
+ SHAREDDATALAYER_ABORT("Notification message parsing error.");
+ break;
+ }
+ case (SubscribeReply::Type::UNKNOWN):
+ {
+ logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
+ SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
+ }
+ }
+ }
+ else
+ SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
+ }
+ else
+ subscribeRetryTimer.arm(
+ subscribeRetryTimerDuration,
+ std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
+}
+
void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
{
dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
this,
std::placeholders::_1,
std::placeholders::_2),
- "dummyNamespace", // Not meaningful for SENTINEL commands
+ "dummyNamespace", // Not meaningful for Sentinel
contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
}
std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
const DatabaseInfo& databaseInfo,
std::shared_ptr<ContentsBuilder> contentsBuilder,
- std::shared_ptr<Logger> logger)
+ std::shared_ptr<Logger> logger,
+ bool usePermanentCommandCallbacks)
{
return AsyncCommandDispatcher::create(engine,
databaseInfo,
contentsBuilder,
- false,
+ usePermanentCommandCallbacks,
logger,
true);
}
+ std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
+ {
+ // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
+ auto replyType = reply.getType();
+ if (replyType == Reply::Type::ARRAY)
+ {
+ auto& replyVector(*reply.getArray());
+ auto firstElementType = replyVector[0]->getType();
+ if (firstElementType == Reply::Type::STRING)
+ {
+ auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
+ auto kind(replyVector[0]->getString()->str);
+ if (kind == "subscribe")
+ {
+ subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
+ return subscribeReply;
+ }
+ else if (kind == "message")
+ {
+ subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
+ auto thirdElementType = replyVector[2]->getType();
+ if (thirdElementType == Reply::Type::STRING)
+ {
+ subscribeReply->message = replyVector[2]->getString()->str;
+ return subscribeReply;
+ }
+ else
+ logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
+ }
+ else
+ logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
+ }
+ else
+ logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
+ << static_cast<int>(firstElementType) << std::endl;
+ }
+ else
+ logger.debug() << "Invalid SUBSCRIBE reply type: "
+ << static_cast<int>(replyType) << std::endl;
+ return nullptr;
+ }
+
std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
{
auto replyType = reply.getType();
<< static_cast<int>(replyType) << std::endl;
return nullptr;
}
+
+ std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
+ {
+ std::vector<std::string> splittedMessage;
+ boost::split(splittedMessage, message, boost::is_any_of(" "));
+ if (splittedMessage.size() == 5)
+ {
+ auto host = splittedMessage[3];
+ auto port = splittedMessage[4];
+ try
+ {
+ return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
+ }
+ catch (const std::exception& e)
+ {
+ logger.debug() << "Invalid host or port in notification message, host: "
+ << host << ", port: " << port
+ << ", exception: " << e.what() << std::endl;
+ }
+ }
+ else
+ logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;
+ return nullptr;
+ }
}
#include <gtest/gtest.h>
#include <arpa/inet.h>
+#include <string>
#include <sdl/asyncstorage.hpp>
#include "private/createlogger.hpp"
#include "private/hostandport.hpp"
public:
std::unique_ptr<AsyncSentinelDatabaseDiscovery> asyncSentinelDatabaseDiscovery;
std::shared_ptr<StrictMock<EngineMock>> engineMock;
+ std::shared_ptr<StrictMock<AsyncCommandDispatcherMock>> subscriberMock;
std::shared_ptr<StrictMock<AsyncCommandDispatcherMock>> dispatcherMock;
std::shared_ptr<StrictMock<ContentsBuilderMock>> contentsBuilderMock;
std::shared_ptr<Logger> logger;
Contents contents;
+ AsyncCommandDispatcher::ConnectAck subscriberConnectAck;
+ AsyncCommandDispatcher::DisconnectCb subscriberDisconnectCb;
AsyncCommandDispatcher::ConnectAck dispatcherConnectAck;
- AsyncCommandDispatcher::CommandCb savedCommandCb;
- ReplyMock replyMock;
+ AsyncCommandDispatcher::CommandCb savedSubscriberCommandCb;
+ AsyncCommandDispatcher::CommandCb savedDispatcherCommandCb;
+ ReplyMock masterInquiryReplyMock;
std::string someHost;
uint16_t somePort;
+ std::string someOtherHost;
+ uint16_t someOtherPort;
Reply::DataItem hostDataItem;
Reply::DataItem portDataItem;
std::shared_ptr<ReplyMock> masterInquiryReplyHost;
std::shared_ptr<ReplyMock> masterInquiryReplyPort;
Reply::ReplyVector masterInquiryReply;
Timer::Duration expectedMasterInquiryRetryTimerDuration;
- Timer::Callback savedConnectionRetryTimerCallback;
+ Timer::Callback savedMasterInquiryRetryTimerCallback;
+ // Mocks for SUBSCRIBE command replies are a bit complicated, because reply might have several
+ // meanings/structures: https://redis.io/topics/pubsub#format-of-pushed-messages
+ ReplyMock subscribeReplyMock;
+ std::shared_ptr<ReplyMock> subscribeReplyArrayElement0;
+ std::shared_ptr<ReplyMock> subscribeReplyArrayElement1;
+ std::shared_ptr<ReplyMock> subscribeReplyArrayElement2;
+ Reply::ReplyVector subscribeReplyVector;
+ Reply::DataItem subscribeDataItem;
+ ReplyMock notificationReplyMock;
+ std::shared_ptr<ReplyMock> notificationReplyArrayElement0;
+ std::shared_ptr<ReplyMock> notificationReplyArrayElement1;
+ std::shared_ptr<ReplyMock> notificationReplyArrayElement2;
+ Reply::ReplyVector notificationReplyVector;
+ Reply::DataItem notificationDataItem;
+ std::string notificationMessage;
+ Reply::DataItem notificationMessageDataItem;
+ Timer::Duration expectedSubscribeRetryTimerDuration;
+ Timer::Callback savedSubscribeRetryTimerCallback;
AsyncSentinelDatabaseDiscoveryBaseTest():
engineMock(std::make_shared<StrictMock<EngineMock>>()),
- dispatcherMock(std::make_shared<StrictMock<AsyncCommandDispatcherMock>>()),
contentsBuilderMock(std::make_shared<StrictMock<ContentsBuilderMock>>(AsyncStorage::SEPARATOR)),
logger(createLogger(SDL_LOG_PREFIX)),
contents({{"aaa","bbb"},{3,3}}),
someHost("somehost"),
somePort(1234),
+ someOtherHost("someotherhost"),
+ someOtherPort(5678),
hostDataItem({someHost,ReplyStringLength(someHost.length())}),
portDataItem({std::to_string(somePort),ReplyStringLength(std::to_string(somePort).length())}),
masterInquiryReplyHost(std::make_shared<ReplyMock>()),
masterInquiryReplyPort(std::make_shared<ReplyMock>()),
- expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1))
+ expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1)),
+ subscribeReplyArrayElement0(std::make_shared<ReplyMock>()),
+ subscribeReplyArrayElement1(std::make_shared<ReplyMock>()),
+ subscribeReplyArrayElement2(std::make_shared<ReplyMock>()),
+ subscribeDataItem({"subscribe",9}),
+ notificationReplyArrayElement0(std::make_shared<ReplyMock>()),
+ notificationReplyArrayElement1(std::make_shared<ReplyMock>()),
+ notificationReplyArrayElement2(std::make_shared<ReplyMock>()),
+ notificationDataItem({"message",7}),
+ notificationMessage("mymaster " + someHost + " " + std::to_string(somePort) + " " + someOtherHost + " " + std::to_string(someOtherPort)),
+ notificationMessageDataItem({notificationMessage, ReplyStringLength(notificationMessage.length())}),
+ expectedSubscribeRetryTimerDuration(std::chrono::seconds(1))
{
masterInquiryReply.push_back(masterInquiryReplyHost);
masterInquiryReply.push_back(masterInquiryReplyPort);
+ subscribeReplyVector.push_back(subscribeReplyArrayElement0);
+ subscribeReplyVector.push_back(subscribeReplyArrayElement1);
+ subscribeReplyVector.push_back(subscribeReplyArrayElement2);
+ notificationReplyVector.push_back(notificationReplyArrayElement0);
+ notificationReplyVector.push_back(notificationReplyArrayElement1);
+ notificationReplyVector.push_back(notificationReplyArrayElement2);
}
virtual ~AsyncSentinelDatabaseDiscoveryBaseTest()
{
}
- std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine&,
- const DatabaseInfo&,
- std::shared_ptr<ContentsBuilder>)
+ std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator()
{
// @TODO Add database info checking when configuration support for sentinel is added.
- newDispatcherCreated();
- return dispatcherMock;
+ if (!subscriberMock)
+ {
+ subscriberMock = std::make_shared<StrictMock<AsyncCommandDispatcherMock>>();
+ newDispatcherCreated();
+ return subscriberMock;
+ }
+ if (!dispatcherMock)
+ {
+ dispatcherMock = std::make_shared<StrictMock<AsyncCommandDispatcherMock>>();
+ newDispatcherCreated();
+ return dispatcherMock;
+ }
+ return nullptr;
}
MOCK_METHOD0(newDispatcherCreated, void());
- void expectNewDispatcherCreated()
+ void expectDispatchersCreated()
{
EXPECT_CALL(*this, newDispatcherCreated())
- .Times(1);
+ .Times(2);
+ }
+
+ void expectSubscriberWaitConnectedAsync()
+ {
+ EXPECT_CALL(*subscriberMock, waitConnectedAsync(_))
+ .Times(1)
+ .WillOnce(Invoke([this](const AsyncCommandDispatcher::ConnectAck& connectAck)
+ {
+ subscriberConnectAck = connectAck;
+ }));
+ }
+
+ void expectSubscriberRegisterDisconnectCb()
+ {
+ EXPECT_CALL(*subscriberMock, registerDisconnectCb(_))
+ .Times(1)
+ .WillOnce(Invoke([this](const AsyncCommandDispatcher::DisconnectCb& disconnectCb)
+ {
+ subscriberDisconnectCb = disconnectCb;
+ }));
}
void expectDispatcherWaitConnectedAsync()
}));
}
+ void expectContentsBuild(const std::string& string,
+ const std::string& string2)
+ {
+ EXPECT_CALL(*contentsBuilderMock, build(string, string2))
+ .Times(1)
+ .WillOnce(Return(contents));
+ }
+
void expectContentsBuild(const std::string& string,
const std::string& string2,
const std::string& string3)
.WillOnce(Return(contents));
}
- void expectDispatchAsync()
+ void expectSubscriberDispatchAsync()
+ {
+ EXPECT_CALL(*subscriberMock, dispatchAsync(_, _, contents))
+ .Times(1)
+ .WillOnce(SaveArg<0>(&savedSubscriberCommandCb));
+ }
+
+ void expectDispatcherDispatchAsync()
{
EXPECT_CALL(*dispatcherMock, dispatchAsync(_, _, contents))
.Times(1)
- .WillOnce(SaveArg<0>(&savedCommandCb));
+ .WillOnce(SaveArg<0>(&savedDispatcherCommandCb));
+ }
+
+ void expectSubscribeNotifications()
+ {
+ expectContentsBuild("SUBSCRIBE", "+switch-master");
+ expectSubscriberDispatchAsync();
}
void expectMasterInquiry()
{
expectContentsBuild("SENTINEL", "get-master-addr-by-name", "mymaster");
- expectDispatchAsync();
+ expectDispatcherDispatchAsync();
}
MOCK_METHOD1(stateChangedCb, void(const DatabaseInfo&));
- void expectStateChangedCb()
+ void expectStateChangedCb(const std::string& host, uint16_t port)
{
EXPECT_CALL(*this, stateChangedCb(_))
.Times(1)
- .WillOnce(Invoke([this](const DatabaseInfo& databaseInfo)
+ .WillOnce(Invoke([this, host, port](const DatabaseInfo& databaseInfo)
{
- EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(someHost, htons(somePort)) }),
+ EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(host, htons(port)) }),
ContainerEq(databaseInfo.hosts));
EXPECT_EQ(DatabaseInfo::Type::SINGLE, databaseInfo.type);
EXPECT_EQ(boost::none, databaseInfo.ns);
}));
}
- void expectGetReplyType(ReplyMock& mock, const Reply::Type& type)
+ void expectMasterIquiryReply()
{
- EXPECT_CALL(mock, getType())
- .Times(1)
- .WillOnce(Return(type));
+ expectGetType(masterInquiryReplyMock, Reply::Type::ARRAY);
+ expectGetArray(masterInquiryReplyMock, masterInquiryReply);
+ expectGetType(*masterInquiryReplyHost, Reply::Type::STRING);
+ expectGetString(*masterInquiryReplyHost, hostDataItem);
+ expectGetType(*masterInquiryReplyPort, Reply::Type::STRING);
+ expectGetString(*masterInquiryReplyPort, portDataItem);
}
- void expectGetReplyArray_ReturnMasterInquiryReply()
+ void expectMasterInquiryRetryTimer()
{
- EXPECT_CALL(replyMock, getArray())
+ EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _))
.Times(1)
- .WillOnce(Return(&masterInquiryReply));
+ .WillOnce(SaveArg<2>(&savedMasterInquiryRetryTimerCallback));
}
- void expectGetReplyString(ReplyMock& mock, const Reply::DataItem& item)
+ void expectSubscribeRetryTimer()
{
- EXPECT_CALL(mock, getString())
+ EXPECT_CALL(*engineMock, armTimer(_, expectedSubscribeRetryTimerDuration, _))
.Times(1)
- .WillOnce(Return(&item));
+ .WillOnce(SaveArg<2>(&savedSubscribeRetryTimerCallback));
}
- void expectMasterIquiryReply()
+ void setStateChangedCbExpectsBeforeMasterInquiry()
{
- expectGetReplyType(replyMock, Reply::Type::ARRAY);
- expectGetReplyArray_ReturnMasterInquiryReply();
- expectGetReplyType(*masterInquiryReplyHost, Reply::Type::STRING);
- expectGetReplyString(*masterInquiryReplyHost, hostDataItem);
- expectGetReplyType(*masterInquiryReplyPort, Reply::Type::STRING);
- expectGetReplyString(*masterInquiryReplyPort, portDataItem);
- }
-
- void expectMasterInquiryRetryTimer()
- {
- EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _))
- .Times(1)
- .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
+ expectSubscriberRegisterDisconnectCb();
+ expectSubscriberWaitConnectedAsync();
+ asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::stateChangedCb,
+ this,
+ std::placeholders::_1));
+ expectSubscribeNotifications();
+ subscriberConnectAck();
+ expectSubscribeReply();
+ expectDispatcherWaitConnectedAsync();
+ savedSubscriberCommandCb(std::error_code(), subscribeReplyMock);
+ expectMasterInquiry();
}
void setDefaultResponsesForMasterInquiryReplyParsing()
{
- ON_CALL(replyMock, getType())
+ ON_CALL(masterInquiryReplyMock, getType())
.WillByDefault(Return(Reply::Type::ARRAY));
- ON_CALL(replyMock, getArray())
+ ON_CALL(masterInquiryReplyMock, getArray())
.WillByDefault(Return(&masterInquiryReply));
ON_CALL(*masterInquiryReplyHost, getType())
.WillByDefault(Return(Reply::Type::STRING));
ON_CALL(*masterInquiryReplyHost, getString())
.WillByDefault(Return(&portDataItem));
}
+
+ void expectGetType(ReplyMock& mock, const Reply::Type& type)
+ {
+ EXPECT_CALL(mock, getType())
+ .Times(1)
+ .WillOnce(Return(type));
+ }
+
+ void expectGetString(ReplyMock& mock, const Reply::DataItem& item)
+ {
+ EXPECT_CALL(mock, getString())
+ .Times(1)
+ .WillOnce(Return(&item));
+ }
+
+ void expectGetInteger(ReplyMock& mock, int value)
+ {
+ EXPECT_CALL(mock, getInteger())
+ .Times(1)
+ .WillOnce(Return(value));
+ }
+
+ void expectGetArray(ReplyMock& mock, Reply::ReplyVector& replyVector)
+ {
+ EXPECT_CALL(mock, getArray())
+ .Times(1)
+ .WillOnce(Return(&replyVector));
+ }
+
+ void expectSubscribeReply()
+ {
+ expectGetType(subscribeReplyMock, Reply::Type::ARRAY);
+ expectGetArray(subscribeReplyMock, subscribeReplyVector);
+ expectGetType(*subscribeReplyArrayElement0, Reply::Type::STRING);
+ expectGetString(*subscribeReplyArrayElement0, subscribeDataItem);
+ }
+
+ void expectNotificationReply()
+ {
+ expectGetType(notificationReplyMock, Reply::Type::ARRAY);
+ expectGetArray(notificationReplyMock, notificationReplyVector);
+ expectGetType(*notificationReplyArrayElement0, Reply::Type::STRING);
+ expectGetString(*notificationReplyArrayElement0, notificationDataItem);
+ expectGetType(*notificationReplyArrayElement2, Reply::Type::STRING);
+ expectGetString(*notificationReplyArrayElement2, notificationMessageDataItem);
+ }
+
+ void setDefaultResponsesForNotificationReplyParsing()
+ {
+ ON_CALL(notificationReplyMock, getType())
+ .WillByDefault(Return(Reply::Type::ARRAY));
+ ON_CALL(notificationReplyMock, getArray())
+ .WillByDefault(Return(¬ificationReplyVector));
+ ON_CALL(*notificationReplyArrayElement0, getType())
+ .WillByDefault(Return(Reply::Type::STRING));
+ ON_CALL(*notificationReplyArrayElement0, getString())
+ .WillByDefault(Return(¬ificationDataItem));
+ ON_CALL(*notificationReplyArrayElement2, getType())
+ .WillByDefault(Return(Reply::Type::STRING));
+ ON_CALL(*notificationReplyArrayElement2, getString())
+ .WillByDefault(Return(¬ificationMessageDataItem));
+ }
};
class AsyncSentinelDatabaseDiscoveryTest: public AsyncSentinelDatabaseDiscoveryBaseTest
public:
AsyncSentinelDatabaseDiscoveryTest()
{
- expectNewDispatcherCreated();
+ expectDispatchersCreated();
asyncSentinelDatabaseDiscovery.reset(
new AsyncSentinelDatabaseDiscovery(
engineMock,
logger,
std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::asyncCommandDispatcherCreator,
- this,
- std::placeholders::_1,
- std::placeholders::_2,
- std::placeholders::_3),
+ this),
contentsBuilderMock));
}
+
+ ~AsyncSentinelDatabaseDiscoveryTest()
+ {
+ EXPECT_CALL(*subscriberMock, disableCommandCallbacks())
+ .Times(1);
+ EXPECT_CALL(*dispatcherMock, disableCommandCallbacks())
+ .Times(1);
+ }
+ };
+
+ class AsyncSentinelDatabaseDiscoveryInListeningModeTest: public AsyncSentinelDatabaseDiscoveryTest
+ {
+ public:
+ AsyncSentinelDatabaseDiscoveryInListeningModeTest()
+ {
+ InSequence dummy;
+ setStateChangedCbExpectsBeforeMasterInquiry();
+ dispatcherConnectAck();
+ expectMasterIquiryReply();
+ expectStateChangedCb(someHost, somePort);
+ savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+ }
};
using AsyncSentinelDatabaseDiscoveryDeathTest = AsyncSentinelDatabaseDiscoveryTest;
+
+ using AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest = AsyncSentinelDatabaseDiscoveryInListeningModeTest;
}
TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, IsNotCopyable)
EXPECT_TRUE((std::is_base_of<AsyncDatabaseDiscovery, AsyncSentinelDatabaseDiscovery>::value));
}
-TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterIsInquiredFromSentinel)
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, SettingChangedCallbackTriggersSentinelNotificationsSubscriptionAndMasterInquiry)
{
InSequence dummy;
- expectDispatcherWaitConnectedAsync();
- asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
- this,
- std::placeholders::_1));
- expectMasterInquiry();
+ setStateChangedCbExpectsBeforeMasterInquiry();
dispatcherConnectAck();
expectMasterIquiryReply();
- expectStateChangedCb();
- savedCommandCb(std::error_code(), replyMock);
+ expectStateChangedCb(someHost, somePort);
+ savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
}
-TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterInquiryErrorTriggersRetry)
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, MasterInquiryErrorTriggersRetry)
{
InSequence dummy;
- expectDispatcherWaitConnectedAsync();
- asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
- this,
- std::placeholders::_1));
- expectMasterInquiry();
+ setStateChangedCbExpectsBeforeMasterInquiry();
dispatcherConnectAck();
expectMasterInquiryRetryTimer();
- savedCommandCb(getWellKnownErrorCode(), replyMock);
+ savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock);
expectMasterInquiry();
- savedConnectionRetryTimerCallback();
+ savedMasterInquiryRetryTimerCallback();
expectMasterIquiryReply();
- expectStateChangedCb();
- savedCommandCb(std::error_code(), replyMock);
+ expectStateChangedCb(someHost, somePort);
+ savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
}
TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidReplyType)
{
InSequence dummy;
- expectDispatcherWaitConnectedAsync();
- asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
- this,
- std::placeholders::_1));
- expectMasterInquiry();
+ setStateChangedCbExpectsBeforeMasterInquiry();
dispatcherConnectAck();
- ON_CALL(replyMock, getType())
+ ON_CALL(masterInquiryReplyMock, getType())
.WillByDefault(Return(Reply::Type::NIL));
- EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+ EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
}
TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidHostElementType)
{
InSequence dummy;
- expectDispatcherWaitConnectedAsync();
- asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
- this,
- std::placeholders::_1));
- expectMasterInquiry();
+ setStateChangedCbExpectsBeforeMasterInquiry();
dispatcherConnectAck();
setDefaultResponsesForMasterInquiryReplyParsing();
ON_CALL(*masterInquiryReplyHost, getType())
.WillByDefault(Return(Reply::Type::NIL));
- EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+ EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
}
TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidPortElementType)
{
InSequence dummy;
- expectDispatcherWaitConnectedAsync();
- asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
- this,
- std::placeholders::_1));
- expectMasterInquiry();
+ setStateChangedCbExpectsBeforeMasterInquiry();
dispatcherConnectAck();
setDefaultResponsesForMasterInquiryReplyParsing();
ON_CALL(*masterInquiryReplyPort, getType())
.WillByDefault(Return(Reply::Type::NIL));
- EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+ EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
}
TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_PortCantBeCastedToInt)
{
InSequence dummy;
- expectDispatcherWaitConnectedAsync();
- asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
- this,
- std::placeholders::_1));
- expectMasterInquiry();
+ setStateChangedCbExpectsBeforeMasterInquiry();
dispatcherConnectAck();
setDefaultResponsesForMasterInquiryReplyParsing();
std::string invalidPort("invalidPort");
Reply::DataItem invalidPortDataItem({invalidPort,ReplyStringLength(invalidPort.length())});
ON_CALL(*masterInquiryReplyPort, getString())
.WillByDefault(Return(&invalidPortDataItem));
- EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+ EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
}
TEST_F(AsyncSentinelDatabaseDiscoveryTest, CallbackIsNotCalledAfterCleared)
{
InSequence dummy;
- expectDispatcherWaitConnectedAsync();
- asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
- this,
- std::placeholders::_1));
- expectMasterInquiry();
+ setStateChangedCbExpectsBeforeMasterInquiry();
dispatcherConnectAck();
expectMasterInquiryRetryTimer();
- savedCommandCb(getWellKnownErrorCode(), replyMock);
+ savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock);
expectMasterInquiry();
- savedConnectionRetryTimerCallback();
+ savedMasterInquiryRetryTimerCallback();
expectMasterIquiryReply();
asyncSentinelDatabaseDiscovery->clearStateChangedCb();
EXPECT_CALL(*this, stateChangedCb(_))
.Times(0);
- savedCommandCb(std::error_code(), replyMock);
+ savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, ChangeNotificationFromSentinel)
+{
+ InSequence dummy;
+ setStateChangedCbExpectsBeforeMasterInquiry();
+ dispatcherConnectAck();
+ expectMasterIquiryReply();
+ expectStateChangedCb(someHost, somePort);
+ savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+ expectNotificationReply();
+ expectStateChangedCb(someOtherHost, someOtherPort);
+ savedSubscriberCommandCb(std::error_code(), notificationReplyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscribeCommandErrorTriggersRetry)
+{
+ InSequence dummy;
+ expectSubscribeRetryTimer();
+ savedSubscriberCommandCb(getWellKnownErrorCode(), subscribeReplyMock);
+ expectSubscribeNotifications();
+ savedSubscribeRetryTimerCallback();
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidReplyType)
+{
+ InSequence dummy;
+ ON_CALL(notificationReplyMock, getType())
+ .WillByDefault(Return(Reply::Type::NIL));
+ EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKindElementType)
+{
+ InSequence dummy;
+ setDefaultResponsesForNotificationReplyParsing();
+ ON_CALL(*notificationReplyArrayElement0, getType())
+ .WillByDefault(Return(Reply::Type::NIL));
+ EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKind)
+{
+ InSequence dummy;
+ setDefaultResponsesForNotificationReplyParsing();
+ std::string invalidKind("invalidKind");
+ Reply::DataItem invalidKindDataItem({invalidKind,ReplyStringLength(invalidKind.length())});
+ ON_CALL(*notificationReplyArrayElement0, getString())
+ .WillByDefault(Return(&invalidKindDataItem));
+ EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageElementType)
+{
+ InSequence dummy;
+ setDefaultResponsesForNotificationReplyParsing();
+ ON_CALL(*notificationReplyArrayElement2, getType())
+ .WillByDefault(Return(Reply::Type::NIL));
+ EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageStructure)
+{
+ InSequence dummy;
+ setDefaultResponsesForNotificationReplyParsing();
+ std::string invalidMessage("mymaster oldHost 1234 5678");
+ auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())}));
+ ON_CALL(*notificationReplyArrayElement2, getString())
+ .WillByDefault(Return(&invalidMessageDataItem));
+ EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidPort)
+{
+ InSequence dummy;
+ setDefaultResponsesForNotificationReplyParsing();
+ std::string invalidMessage("mymaster oldHost 1234 newHost invalidPort");
+ auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())}));
+ ON_CALL(*notificationReplyArrayElement2, getString())
+ .WillByDefault(Return(&invalidMessageDataItem));
+ EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscriberDisconnectCallbackTriggersSubscriptionRenewal)
+{
+ InSequence dummy;
+ expectSubscriberWaitConnectedAsync();
+ subscriberDisconnectCb();
+ expectSubscribeNotifications();
+ subscriberConnectAck();
+ expectSubscribeReply();
+ expectDispatcherWaitConnectedAsync();
+ savedSubscriberCommandCb(std::error_code(), subscribeReplyMock);
+ expectMasterInquiry();
+ dispatcherConnectAck();
+ expectMasterIquiryReply();
+ expectStateChangedCb(someHost, somePort);
+ savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
}