From: Rolf Badorek Date: Mon, 23 Sep 2019 11:14:56 +0000 (+0300) Subject: Add Sentinel change notification handling logic X-Git-Tag: 1.2.1~24 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=b7f4971cb7d84e8288140901f4a9dfa773292421;p=ric-plt%2Fsdl.git Add Sentinel change notification handling logic When new Redis master has been promoted, Sentinel publishes notification to '+switch-master' channel. Refer to below web page for further details: https://redis.io/topics/sentinel 'AsyncSentinelDatabaseDiscovery' will now subscribe notifications for above mentioned channel. Notification contains information of new Redis master, which is parsed from message and sent to upper layer via 'StateChangedCb' callback (if callback is set). When notifications are subscribed from Redis (from Sentinel this case), connection will go to "subscribed state", and only some pub/sub related commands are allowed. Due the above reason, we have two connections (command dispatchers). One to subscribe notifications and the other for Sentinel commands, like master inquiry. Refer to below web page for further details: https://redis.io/topics/pubsub In case that subscriber connection goes down, subscription for notifications is renewed once connection to Sentinel is working again. Extra master inquiry will be made because we might be missed notifications during connection cut. Latest master address is refreshed via 'StateChangedCb', even if has not changed compared to last informed address. This could be optimized, but as being very rare situation was not seen worth of extra logic. In case that the other connection (used for Sentinel commands) is cut, the related command dispatcher will re-connect in the background. Possible Sentinel commands during connection cut will fail and trigger retry after short delay (per already existing implementation). If some notifications are missed due some other reason than connection cut, SDL might go to the state that operations will made to Redis slave. In this situation write operations will fail with a new internal 'AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE' error code, which is mapped to 'shareddatalayer::Error::BACKEND_FAILURE'. Recovery instructions adjusted a bit, so that re-creating SDL API instance is not optional recovery step (it is the only way to recover from above mentioned situation currently). Sentinel support is still disabled, missing implementation will be added soon as a separate commit(s). Signed-off-by: Rolf Badorek Change-Id: I1bb9e121985ee22278e780e50ab13f88acdc65c5 --- diff --git a/include/private/error.hpp b/include/private/error.hpp index 6b28f75..70f6b1a 100644 --- a/include/private/error.hpp +++ b/include/private/error.hpp @@ -67,6 +67,7 @@ namespace shareddatalayer DATASET_LOADING, NOT_CONNECTED, IO_ERROR, + WRITING_TO_SLAVE, //Keep this always as last item. Used in unit tests to loop all enum values. END_MARKER }; diff --git a/include/private/redis/asyncsentineldatabasediscovery.hpp b/include/private/redis/asyncsentineldatabasediscovery.hpp index 527fa4e..98d1d23 100644 --- a/include/private/redis/asyncsentineldatabasediscovery.hpp +++ b/include/private/redis/asyncsentineldatabasediscovery.hpp @@ -44,7 +44,8 @@ namespace shareddatalayer using AsyncCommandDispatcherCreator = std::function(Engine& engine, const redis::DatabaseInfo& databaseInfo, std::shared_ptr contentsBuilder, - std::shared_ptr logger)>; + std::shared_ptr logger, + bool usePermanentCommandCallbacks)>; AsyncSentinelDatabaseDiscovery(const AsyncSentinelDatabaseDiscovery&) = delete; @@ -58,7 +59,7 @@ namespace shareddatalayer const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator, std::shared_ptr contentsBuilder); - ~AsyncSentinelDatabaseDiscovery() override = default; + ~AsyncSentinelDatabaseDiscovery() override; void setStateChangedCb(const StateChangedCb& stateChangedCb) override; @@ -70,11 +71,18 @@ namespace shareddatalayer std::shared_ptr logger; StateChangedCb stateChangedCb; DatabaseInfo databaseInfo; + std::shared_ptr subscriber; std::shared_ptr dispatcher; std::shared_ptr contentsBuilder; + Timer subscribeRetryTimer; + Timer::Duration subscribeRetryTimerDuration; Timer masterInquiryRetryTimer; Timer::Duration masterInquiryRetryTimerDuration; + void subscribeNotifications(); + + void subscribeAck(const std::error_code& error, const Reply& reply); + void sendMasterInquiry(); void masterInquiryAck(const std::error_code& error, const Reply& reply); diff --git a/include/sdl/errorqueries.hpp b/include/sdl/errorqueries.hpp index d8b7e01..5fdfadd 100644 --- a/include/sdl/errorqueries.hpp +++ b/include/sdl/errorqueries.hpp @@ -62,9 +62,9 @@ namespace shareddatalayer * shareddatalayer::Error::BACKEND_FAILURE:
* shareddatalayer delivered the request to the backend data storage but the backend data storage failed to process the request. * In case of a write type request, data in the backend data storage may or may not have been altered.
- * Client is advised to try the operation again later. Optionally client can also re-create the used shareddatalayer instance. + * Client is advised to try the operation again later. If also re-tries are failing client should re-create the used shareddatalayer instance. * It is possible that the system does not automatically recover from this type of error situations. Therefore client is advised - * to escalate the problem to O&M if operation does not succeed after several retries.
+ * to escalate the problem to O&M if operation does not succeed after above mentioned recovery actions.
* When shareddatalayer operations work again, client can choose how to best address the possible loss of consistency * (see shareddatalayer::Error::OPERATION_INTERRUPTED for possible options). * diff --git a/src/error.cpp b/src/error.cpp index 347e651..d6bbaa8 100644 --- a/src/error.cpp +++ b/src/error.cpp @@ -109,6 +109,8 @@ namespace return "redis error"; case AsyncRedisCommandDispatcherErrorCode::IO_ERROR: return "redis I/O error"; + case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE: + return "writing to slave"; case AsyncRedisCommandDispatcherErrorCode::END_MARKER: logErrorOnce("AsyncRedisCommandDispatcherErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)"); return "unsupported error code for message()"; @@ -137,6 +139,8 @@ namespace return InternalError::BACKEND_ERROR; case AsyncRedisCommandDispatcherErrorCode::IO_ERROR: return InternalError::BACKEND_ERROR; + case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE: + return InternalError::BACKEND_ERROR; case AsyncRedisCommandDispatcherErrorCode::END_MARKER: logErrorOnce("AsyncRedisCommandDispatcherErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)"); return InternalError::SDL_ERROR_CODE_LOGIC_ERROR; diff --git a/src/redis/asyncsentineldatabasediscovery.cpp b/src/redis/asyncsentineldatabasediscovery.cpp index 05815e2..be51c5e 100644 --- a/src/redis/asyncsentineldatabasediscovery.cpp +++ b/src/redis/asyncsentineldatabasediscovery.cpp @@ -15,8 +15,10 @@ */ #include +#include #include #include +#include #include #include "private/abort.hpp" #include "private/hostandport.hpp" @@ -34,9 +36,23 @@ namespace std::shared_ptr asyncCommandDispatcherCreator(Engine& engine, const DatabaseInfo& databaseInfo, std::shared_ptr contentsBuilder, - std::shared_ptr logger); + std::shared_ptr logger, + bool usePermanentCommandCallbacks); + + struct SubscribeReply + { + enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION }; + Type type; + std::string message; + + SubscribeReply(): type(Type::UNKNOWN) { } + }; + + std::unique_ptr parseSubscribeReply(const Reply& reply, Logger& logger); std::unique_ptr parseMasterInquiryReply(const Reply& reply, Logger& logger); + + std::unique_ptr parseNotificationMessage(const std::string& message, Logger& logger); } AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr engine, @@ -60,19 +76,40 @@ AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptrdisableCommandCallbacks(); + if (dispatcher) + dispatcher->disableCommandCallbacks(); + stateChangedCb = nullptr; } void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb) { stateChangedCb = cb; - dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this)); + subscriber->registerDisconnectCb([this]() + { + subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this)); + }); + subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this)); } void AsyncSentinelDatabaseDiscovery::clearStateChangedCb() @@ -80,13 +117,70 @@ void AsyncSentinelDatabaseDiscovery::clearStateChangedCb() stateChangedCb = nullptr; } +void AsyncSentinelDatabaseDiscovery::subscribeNotifications() +{ + subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck, + this, + std::placeholders::_1, + std::placeholders::_2), + "dummyNamespace", // Not meaningful for Sentinel + contentsBuilder->build("SUBSCRIBE", "+switch-master")); +} + +void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error, + const Reply& reply) +{ + if (!error) + { + auto subscribeReply = parseSubscribeReply(reply, *logger); + if (subscribeReply) + { + switch (subscribeReply->type) + { + case (SubscribeReply::Type::SUBSCRIBE_REPLY): + { + dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this)); + break; + } + case (SubscribeReply::Type::NOTIFICATION): + { + auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger); + if (hostAndPort) + { + auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}), + DatabaseInfo::Type::SINGLE, + boost::none, + DatabaseInfo::Discovery::SENTINEL})); + if (stateChangedCb) + stateChangedCb(databaseInfo); + } + else + SHAREDDATALAYER_ABORT("Notification message parsing error."); + break; + } + case (SubscribeReply::Type::UNKNOWN): + { + logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl; + SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type."); + } + } + } + else + SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error."); + } + else + subscribeRetryTimer.arm( + subscribeRetryTimerDuration, + std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this)); +} + void AsyncSentinelDatabaseDiscovery::sendMasterInquiry() { dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck, this, std::placeholders::_1, std::placeholders::_2), - "dummyNamespace", // Not meaningful for SENTINEL commands + "dummyNamespace", // Not meaningful for Sentinel contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable } @@ -121,16 +215,59 @@ namespace std::shared_ptr asyncCommandDispatcherCreator(Engine& engine, const DatabaseInfo& databaseInfo, std::shared_ptr contentsBuilder, - std::shared_ptr logger) + std::shared_ptr logger, + bool usePermanentCommandCallbacks) { return AsyncCommandDispatcher::create(engine, databaseInfo, contentsBuilder, - false, + usePermanentCommandCallbacks, logger, true); } + std::unique_ptr parseSubscribeReply(const Reply& reply, Logger& logger) + { + // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages + auto replyType = reply.getType(); + if (replyType == Reply::Type::ARRAY) + { + auto& replyVector(*reply.getArray()); + auto firstElementType = replyVector[0]->getType(); + if (firstElementType == Reply::Type::STRING) + { + auto subscribeReply = std::unique_ptr(new SubscribeReply()); + auto kind(replyVector[0]->getString()->str); + if (kind == "subscribe") + { + subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY; + return subscribeReply; + } + else if (kind == "message") + { + subscribeReply->type = SubscribeReply::Type::NOTIFICATION; + auto thirdElementType = replyVector[2]->getType(); + if (thirdElementType == Reply::Type::STRING) + { + subscribeReply->message = replyVector[2]->getString()->str; + return subscribeReply; + } + else + logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl; + } + else + logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl; + } + else + logger.debug() << "Invalid first element type in SUBSCRIBE reply: " + << static_cast(firstElementType) << std::endl; + } + else + logger.debug() << "Invalid SUBSCRIBE reply type: " + << static_cast(replyType) << std::endl; + return nullptr; + } + std::unique_ptr parseMasterInquiryReply(const Reply& reply, Logger& logger) { auto replyType = reply.getType(); @@ -169,4 +306,28 @@ namespace << static_cast(replyType) << std::endl; return nullptr; } + + std::unique_ptr parseNotificationMessage(const std::string& message, Logger& logger) + { + std::vector splittedMessage; + boost::split(splittedMessage, message, boost::is_any_of(" ")); + if (splittedMessage.size() == 5) + { + auto host = splittedMessage[3]; + auto port = splittedMessage[4]; + try + { + return std::unique_ptr(new HostAndPort(host+":"+port, 0));; + } + catch (const std::exception& e) + { + logger.debug() << "Invalid host or port in notification message, host: " + << host << ", port: " << port + << ", exception: " << e.what() << std::endl; + } + } + else + logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl; + return nullptr; + } } diff --git a/src/redis/redisgeneral.cpp b/src/redis/redisgeneral.cpp index eb837ae..f031a77 100644 --- a/src/redis/redisgeneral.cpp +++ b/src/redis/redisgeneral.cpp @@ -70,6 +70,9 @@ namespace if (startsWith("ERR Protocol error", rr->str, static_cast(rr->len))) return AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR; + if (startsWith("READONLY", rr->str, static_cast(rr->len))) + return AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE; + std::ostringstream oss; oss << "redis reply error: " << std::string(rr->str, static_cast(rr->len)); logErrorOnce(oss.str()); diff --git a/tst/asyncsentineldatabasediscovery_test.cpp b/tst/asyncsentineldatabasediscovery_test.cpp index dcf35c7..ab48284 100644 --- a/tst/asyncsentineldatabasediscovery_test.cpp +++ b/tst/asyncsentineldatabasediscovery_test.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include "private/createlogger.hpp" #include "private/hostandport.hpp" @@ -39,60 +40,131 @@ namespace public: std::unique_ptr asyncSentinelDatabaseDiscovery; std::shared_ptr> engineMock; + std::shared_ptr> subscriberMock; std::shared_ptr> dispatcherMock; std::shared_ptr> contentsBuilderMock; std::shared_ptr logger; Contents contents; + AsyncCommandDispatcher::ConnectAck subscriberConnectAck; + AsyncCommandDispatcher::DisconnectCb subscriberDisconnectCb; AsyncCommandDispatcher::ConnectAck dispatcherConnectAck; - AsyncCommandDispatcher::CommandCb savedCommandCb; - ReplyMock replyMock; + AsyncCommandDispatcher::CommandCb savedSubscriberCommandCb; + AsyncCommandDispatcher::CommandCb savedDispatcherCommandCb; + ReplyMock masterInquiryReplyMock; std::string someHost; uint16_t somePort; + std::string someOtherHost; + uint16_t someOtherPort; Reply::DataItem hostDataItem; Reply::DataItem portDataItem; std::shared_ptr masterInquiryReplyHost; std::shared_ptr masterInquiryReplyPort; Reply::ReplyVector masterInquiryReply; Timer::Duration expectedMasterInquiryRetryTimerDuration; - Timer::Callback savedConnectionRetryTimerCallback; + Timer::Callback savedMasterInquiryRetryTimerCallback; + // Mocks for SUBSCRIBE command replies are a bit complicated, because reply might have several + // meanings/structures: https://redis.io/topics/pubsub#format-of-pushed-messages + ReplyMock subscribeReplyMock; + std::shared_ptr subscribeReplyArrayElement0; + std::shared_ptr subscribeReplyArrayElement1; + std::shared_ptr subscribeReplyArrayElement2; + Reply::ReplyVector subscribeReplyVector; + Reply::DataItem subscribeDataItem; + ReplyMock notificationReplyMock; + std::shared_ptr notificationReplyArrayElement0; + std::shared_ptr notificationReplyArrayElement1; + std::shared_ptr notificationReplyArrayElement2; + Reply::ReplyVector notificationReplyVector; + Reply::DataItem notificationDataItem; + std::string notificationMessage; + Reply::DataItem notificationMessageDataItem; + Timer::Duration expectedSubscribeRetryTimerDuration; + Timer::Callback savedSubscribeRetryTimerCallback; AsyncSentinelDatabaseDiscoveryBaseTest(): engineMock(std::make_shared>()), - dispatcherMock(std::make_shared>()), contentsBuilderMock(std::make_shared>(AsyncStorage::SEPARATOR)), logger(createLogger(SDL_LOG_PREFIX)), contents({{"aaa","bbb"},{3,3}}), someHost("somehost"), somePort(1234), + someOtherHost("someotherhost"), + someOtherPort(5678), hostDataItem({someHost,ReplyStringLength(someHost.length())}), portDataItem({std::to_string(somePort),ReplyStringLength(std::to_string(somePort).length())}), masterInquiryReplyHost(std::make_shared()), masterInquiryReplyPort(std::make_shared()), - expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1)) + expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1)), + subscribeReplyArrayElement0(std::make_shared()), + subscribeReplyArrayElement1(std::make_shared()), + subscribeReplyArrayElement2(std::make_shared()), + subscribeDataItem({"subscribe",9}), + notificationReplyArrayElement0(std::make_shared()), + notificationReplyArrayElement1(std::make_shared()), + notificationReplyArrayElement2(std::make_shared()), + notificationDataItem({"message",7}), + notificationMessage("mymaster " + someHost + " " + std::to_string(somePort) + " " + someOtherHost + " " + std::to_string(someOtherPort)), + notificationMessageDataItem({notificationMessage, ReplyStringLength(notificationMessage.length())}), + expectedSubscribeRetryTimerDuration(std::chrono::seconds(1)) { masterInquiryReply.push_back(masterInquiryReplyHost); masterInquiryReply.push_back(masterInquiryReplyPort); + subscribeReplyVector.push_back(subscribeReplyArrayElement0); + subscribeReplyVector.push_back(subscribeReplyArrayElement1); + subscribeReplyVector.push_back(subscribeReplyArrayElement2); + notificationReplyVector.push_back(notificationReplyArrayElement0); + notificationReplyVector.push_back(notificationReplyArrayElement1); + notificationReplyVector.push_back(notificationReplyArrayElement2); } virtual ~AsyncSentinelDatabaseDiscoveryBaseTest() { } - std::shared_ptr asyncCommandDispatcherCreator(Engine&, - const DatabaseInfo&, - std::shared_ptr) + std::shared_ptr asyncCommandDispatcherCreator() { // @TODO Add database info checking when configuration support for sentinel is added. - newDispatcherCreated(); - return dispatcherMock; + if (!subscriberMock) + { + subscriberMock = std::make_shared>(); + newDispatcherCreated(); + return subscriberMock; + } + if (!dispatcherMock) + { + dispatcherMock = std::make_shared>(); + newDispatcherCreated(); + return dispatcherMock; + } + return nullptr; } MOCK_METHOD0(newDispatcherCreated, void()); - void expectNewDispatcherCreated() + void expectDispatchersCreated() { EXPECT_CALL(*this, newDispatcherCreated()) - .Times(1); + .Times(2); + } + + void expectSubscriberWaitConnectedAsync() + { + EXPECT_CALL(*subscriberMock, waitConnectedAsync(_)) + .Times(1) + .WillOnce(Invoke([this](const AsyncCommandDispatcher::ConnectAck& connectAck) + { + subscriberConnectAck = connectAck; + })); + } + + void expectSubscriberRegisterDisconnectCb() + { + EXPECT_CALL(*subscriberMock, registerDisconnectCb(_)) + .Times(1) + .WillOnce(Invoke([this](const AsyncCommandDispatcher::DisconnectCb& disconnectCb) + { + subscriberDisconnectCb = disconnectCb; + })); } void expectDispatcherWaitConnectedAsync() @@ -105,6 +177,14 @@ namespace })); } + void expectContentsBuild(const std::string& string, + const std::string& string2) + { + EXPECT_CALL(*contentsBuilderMock, build(string, string2)) + .Times(1) + .WillOnce(Return(contents)); + } + void expectContentsBuild(const std::string& string, const std::string& string2, const std::string& string3) @@ -114,28 +194,41 @@ namespace .WillOnce(Return(contents)); } - void expectDispatchAsync() + void expectSubscriberDispatchAsync() + { + EXPECT_CALL(*subscriberMock, dispatchAsync(_, _, contents)) + .Times(1) + .WillOnce(SaveArg<0>(&savedSubscriberCommandCb)); + } + + void expectDispatcherDispatchAsync() { EXPECT_CALL(*dispatcherMock, dispatchAsync(_, _, contents)) .Times(1) - .WillOnce(SaveArg<0>(&savedCommandCb)); + .WillOnce(SaveArg<0>(&savedDispatcherCommandCb)); + } + + void expectSubscribeNotifications() + { + expectContentsBuild("SUBSCRIBE", "+switch-master"); + expectSubscriberDispatchAsync(); } void expectMasterInquiry() { expectContentsBuild("SENTINEL", "get-master-addr-by-name", "mymaster"); - expectDispatchAsync(); + expectDispatcherDispatchAsync(); } MOCK_METHOD1(stateChangedCb, void(const DatabaseInfo&)); - void expectStateChangedCb() + void expectStateChangedCb(const std::string& host, uint16_t port) { EXPECT_CALL(*this, stateChangedCb(_)) .Times(1) - .WillOnce(Invoke([this](const DatabaseInfo& databaseInfo) + .WillOnce(Invoke([this, host, port](const DatabaseInfo& databaseInfo) { - EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(someHost, htons(somePort)) }), + EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(host, htons(port)) }), ContainerEq(databaseInfo.hosts)); EXPECT_EQ(DatabaseInfo::Type::SINGLE, databaseInfo.type); EXPECT_EQ(boost::none, databaseInfo.ns); @@ -143,49 +236,50 @@ namespace })); } - void expectGetReplyType(ReplyMock& mock, const Reply::Type& type) + void expectMasterIquiryReply() { - EXPECT_CALL(mock, getType()) - .Times(1) - .WillOnce(Return(type)); + expectGetType(masterInquiryReplyMock, Reply::Type::ARRAY); + expectGetArray(masterInquiryReplyMock, masterInquiryReply); + expectGetType(*masterInquiryReplyHost, Reply::Type::STRING); + expectGetString(*masterInquiryReplyHost, hostDataItem); + expectGetType(*masterInquiryReplyPort, Reply::Type::STRING); + expectGetString(*masterInquiryReplyPort, portDataItem); } - void expectGetReplyArray_ReturnMasterInquiryReply() + void expectMasterInquiryRetryTimer() { - EXPECT_CALL(replyMock, getArray()) + EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _)) .Times(1) - .WillOnce(Return(&masterInquiryReply)); + .WillOnce(SaveArg<2>(&savedMasterInquiryRetryTimerCallback)); } - void expectGetReplyString(ReplyMock& mock, const Reply::DataItem& item) + void expectSubscribeRetryTimer() { - EXPECT_CALL(mock, getString()) + EXPECT_CALL(*engineMock, armTimer(_, expectedSubscribeRetryTimerDuration, _)) .Times(1) - .WillOnce(Return(&item)); + .WillOnce(SaveArg<2>(&savedSubscribeRetryTimerCallback)); } - void expectMasterIquiryReply() + void setStateChangedCbExpectsBeforeMasterInquiry() { - expectGetReplyType(replyMock, Reply::Type::ARRAY); - expectGetReplyArray_ReturnMasterInquiryReply(); - expectGetReplyType(*masterInquiryReplyHost, Reply::Type::STRING); - expectGetReplyString(*masterInquiryReplyHost, hostDataItem); - expectGetReplyType(*masterInquiryReplyPort, Reply::Type::STRING); - expectGetReplyString(*masterInquiryReplyPort, portDataItem); - } - - void expectMasterInquiryRetryTimer() - { - EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _)) - .Times(1) - .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback)); + expectSubscriberRegisterDisconnectCb(); + expectSubscriberWaitConnectedAsync(); + asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::stateChangedCb, + this, + std::placeholders::_1)); + expectSubscribeNotifications(); + subscriberConnectAck(); + expectSubscribeReply(); + expectDispatcherWaitConnectedAsync(); + savedSubscriberCommandCb(std::error_code(), subscribeReplyMock); + expectMasterInquiry(); } void setDefaultResponsesForMasterInquiryReplyParsing() { - ON_CALL(replyMock, getType()) + ON_CALL(masterInquiryReplyMock, getType()) .WillByDefault(Return(Reply::Type::ARRAY)); - ON_CALL(replyMock, getArray()) + ON_CALL(masterInquiryReplyMock, getArray()) .WillByDefault(Return(&masterInquiryReply)); ON_CALL(*masterInquiryReplyHost, getType()) .WillByDefault(Return(Reply::Type::STRING)); @@ -196,6 +290,68 @@ namespace ON_CALL(*masterInquiryReplyHost, getString()) .WillByDefault(Return(&portDataItem)); } + + void expectGetType(ReplyMock& mock, const Reply::Type& type) + { + EXPECT_CALL(mock, getType()) + .Times(1) + .WillOnce(Return(type)); + } + + void expectGetString(ReplyMock& mock, const Reply::DataItem& item) + { + EXPECT_CALL(mock, getString()) + .Times(1) + .WillOnce(Return(&item)); + } + + void expectGetInteger(ReplyMock& mock, int value) + { + EXPECT_CALL(mock, getInteger()) + .Times(1) + .WillOnce(Return(value)); + } + + void expectGetArray(ReplyMock& mock, Reply::ReplyVector& replyVector) + { + EXPECT_CALL(mock, getArray()) + .Times(1) + .WillOnce(Return(&replyVector)); + } + + void expectSubscribeReply() + { + expectGetType(subscribeReplyMock, Reply::Type::ARRAY); + expectGetArray(subscribeReplyMock, subscribeReplyVector); + expectGetType(*subscribeReplyArrayElement0, Reply::Type::STRING); + expectGetString(*subscribeReplyArrayElement0, subscribeDataItem); + } + + void expectNotificationReply() + { + expectGetType(notificationReplyMock, Reply::Type::ARRAY); + expectGetArray(notificationReplyMock, notificationReplyVector); + expectGetType(*notificationReplyArrayElement0, Reply::Type::STRING); + expectGetString(*notificationReplyArrayElement0, notificationDataItem); + expectGetType(*notificationReplyArrayElement2, Reply::Type::STRING); + expectGetString(*notificationReplyArrayElement2, notificationMessageDataItem); + } + + void setDefaultResponsesForNotificationReplyParsing() + { + ON_CALL(notificationReplyMock, getType()) + .WillByDefault(Return(Reply::Type::ARRAY)); + ON_CALL(notificationReplyMock, getArray()) + .WillByDefault(Return(¬ificationReplyVector)); + ON_CALL(*notificationReplyArrayElement0, getType()) + .WillByDefault(Return(Reply::Type::STRING)); + ON_CALL(*notificationReplyArrayElement0, getString()) + .WillByDefault(Return(¬ificationDataItem)); + ON_CALL(*notificationReplyArrayElement2, getType()) + .WillByDefault(Return(Reply::Type::STRING)); + ON_CALL(*notificationReplyArrayElement2, getString()) + .WillByDefault(Return(¬ificationMessageDataItem)); + } }; class AsyncSentinelDatabaseDiscoveryTest: public AsyncSentinelDatabaseDiscoveryBaseTest @@ -203,21 +359,42 @@ namespace public: AsyncSentinelDatabaseDiscoveryTest() { - expectNewDispatcherCreated(); + expectDispatchersCreated(); asyncSentinelDatabaseDiscovery.reset( new AsyncSentinelDatabaseDiscovery( engineMock, logger, std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::asyncCommandDispatcherCreator, - this, - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3), + this), contentsBuilderMock)); } + + ~AsyncSentinelDatabaseDiscoveryTest() + { + EXPECT_CALL(*subscriberMock, disableCommandCallbacks()) + .Times(1); + EXPECT_CALL(*dispatcherMock, disableCommandCallbacks()) + .Times(1); + } + }; + + class AsyncSentinelDatabaseDiscoveryInListeningModeTest: public AsyncSentinelDatabaseDiscoveryTest + { + public: + AsyncSentinelDatabaseDiscoveryInListeningModeTest() + { + InSequence dummy; + setStateChangedCbExpectsBeforeMasterInquiry(); + dispatcherConnectAck(); + expectMasterIquiryReply(); + expectStateChangedCb(someHost, somePort); + savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock); + } }; using AsyncSentinelDatabaseDiscoveryDeathTest = AsyncSentinelDatabaseDiscoveryTest; + + using AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest = AsyncSentinelDatabaseDiscoveryInListeningModeTest; } TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, IsNotCopyable) @@ -233,115 +410,185 @@ TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, ImplementsAsyncDatabaseDiscovery) EXPECT_TRUE((std::is_base_of::value)); } -TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterIsInquiredFromSentinel) +TEST_F(AsyncSentinelDatabaseDiscoveryTest, SettingChangedCallbackTriggersSentinelNotificationsSubscriptionAndMasterInquiry) { InSequence dummy; - expectDispatcherWaitConnectedAsync(); - asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, - this, - std::placeholders::_1)); - expectMasterInquiry(); + setStateChangedCbExpectsBeforeMasterInquiry(); dispatcherConnectAck(); expectMasterIquiryReply(); - expectStateChangedCb(); - savedCommandCb(std::error_code(), replyMock); + expectStateChangedCb(someHost, somePort); + savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock); } -TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterInquiryErrorTriggersRetry) +TEST_F(AsyncSentinelDatabaseDiscoveryTest, MasterInquiryErrorTriggersRetry) { InSequence dummy; - expectDispatcherWaitConnectedAsync(); - asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, - this, - std::placeholders::_1)); - expectMasterInquiry(); + setStateChangedCbExpectsBeforeMasterInquiry(); dispatcherConnectAck(); expectMasterInquiryRetryTimer(); - savedCommandCb(getWellKnownErrorCode(), replyMock); + savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock); expectMasterInquiry(); - savedConnectionRetryTimerCallback(); + savedMasterInquiryRetryTimerCallback(); expectMasterIquiryReply(); - expectStateChangedCb(); - savedCommandCb(std::error_code(), replyMock); + expectStateChangedCb(someHost, somePort); + savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock); } TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidReplyType) { InSequence dummy; - expectDispatcherWaitConnectedAsync(); - asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, - this, - std::placeholders::_1)); - expectMasterInquiry(); + setStateChangedCbExpectsBeforeMasterInquiry(); dispatcherConnectAck(); - ON_CALL(replyMock, getType()) + ON_CALL(masterInquiryReplyMock, getType()) .WillByDefault(Return(Reply::Type::NIL)); - EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); + EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); } TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidHostElementType) { InSequence dummy; - expectDispatcherWaitConnectedAsync(); - asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, - this, - std::placeholders::_1)); - expectMasterInquiry(); + setStateChangedCbExpectsBeforeMasterInquiry(); dispatcherConnectAck(); setDefaultResponsesForMasterInquiryReplyParsing(); ON_CALL(*masterInquiryReplyHost, getType()) .WillByDefault(Return(Reply::Type::NIL)); - EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); + EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); } TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidPortElementType) { InSequence dummy; - expectDispatcherWaitConnectedAsync(); - asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, - this, - std::placeholders::_1)); - expectMasterInquiry(); + setStateChangedCbExpectsBeforeMasterInquiry(); dispatcherConnectAck(); setDefaultResponsesForMasterInquiryReplyParsing(); ON_CALL(*masterInquiryReplyPort, getType()) .WillByDefault(Return(Reply::Type::NIL)); - EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); + EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); } TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_PortCantBeCastedToInt) { InSequence dummy; - expectDispatcherWaitConnectedAsync(); - asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, - this, - std::placeholders::_1)); - expectMasterInquiry(); + setStateChangedCbExpectsBeforeMasterInquiry(); dispatcherConnectAck(); setDefaultResponsesForMasterInquiryReplyParsing(); std::string invalidPort("invalidPort"); Reply::DataItem invalidPortDataItem({invalidPort,ReplyStringLength(invalidPort.length())}); ON_CALL(*masterInquiryReplyPort, getString()) .WillByDefault(Return(&invalidPortDataItem)); - EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); + EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error"); } TEST_F(AsyncSentinelDatabaseDiscoveryTest, CallbackIsNotCalledAfterCleared) { InSequence dummy; - expectDispatcherWaitConnectedAsync(); - asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb, - this, - std::placeholders::_1)); - expectMasterInquiry(); + setStateChangedCbExpectsBeforeMasterInquiry(); dispatcherConnectAck(); expectMasterInquiryRetryTimer(); - savedCommandCb(getWellKnownErrorCode(), replyMock); + savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock); expectMasterInquiry(); - savedConnectionRetryTimerCallback(); + savedMasterInquiryRetryTimerCallback(); expectMasterIquiryReply(); asyncSentinelDatabaseDiscovery->clearStateChangedCb(); EXPECT_CALL(*this, stateChangedCb(_)) .Times(0); - savedCommandCb(std::error_code(), replyMock); + savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryTest, ChangeNotificationFromSentinel) +{ + InSequence dummy; + setStateChangedCbExpectsBeforeMasterInquiry(); + dispatcherConnectAck(); + expectMasterIquiryReply(); + expectStateChangedCb(someHost, somePort); + savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock); + expectNotificationReply(); + expectStateChangedCb(someOtherHost, someOtherPort); + savedSubscriberCommandCb(std::error_code(), notificationReplyMock); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscribeCommandErrorTriggersRetry) +{ + InSequence dummy; + expectSubscribeRetryTimer(); + savedSubscriberCommandCb(getWellKnownErrorCode(), subscribeReplyMock); + expectSubscribeNotifications(); + savedSubscribeRetryTimerCallback(); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidReplyType) +{ + InSequence dummy; + ON_CALL(notificationReplyMock, getType()) + .WillByDefault(Return(Reply::Type::NIL)); + EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKindElementType) +{ + InSequence dummy; + setDefaultResponsesForNotificationReplyParsing(); + ON_CALL(*notificationReplyArrayElement0, getType()) + .WillByDefault(Return(Reply::Type::NIL)); + EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKind) +{ + InSequence dummy; + setDefaultResponsesForNotificationReplyParsing(); + std::string invalidKind("invalidKind"); + Reply::DataItem invalidKindDataItem({invalidKind,ReplyStringLength(invalidKind.length())}); + ON_CALL(*notificationReplyArrayElement0, getString()) + .WillByDefault(Return(&invalidKindDataItem)); + EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageElementType) +{ + InSequence dummy; + setDefaultResponsesForNotificationReplyParsing(); + ON_CALL(*notificationReplyArrayElement2, getType()) + .WillByDefault(Return(Reply::Type::NIL)); + EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageStructure) +{ + InSequence dummy; + setDefaultResponsesForNotificationReplyParsing(); + std::string invalidMessage("mymaster oldHost 1234 5678"); + auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())})); + ON_CALL(*notificationReplyArrayElement2, getString()) + .WillByDefault(Return(&invalidMessageDataItem)); + EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidPort) +{ + InSequence dummy; + setDefaultResponsesForNotificationReplyParsing(); + std::string invalidMessage("mymaster oldHost 1234 newHost invalidPort"); + auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())})); + ON_CALL(*notificationReplyArrayElement2, getString()) + .WillByDefault(Return(&invalidMessageDataItem)); + EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error"); +} + +TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscriberDisconnectCallbackTriggersSubscriptionRenewal) +{ + InSequence dummy; + expectSubscriberWaitConnectedAsync(); + subscriberDisconnectCb(); + expectSubscribeNotifications(); + subscriberConnectAck(); + expectSubscribeReply(); + expectDispatcherWaitConnectedAsync(); + savedSubscriberCommandCb(std::error_code(), subscribeReplyMock); + expectMasterInquiry(); + dispatcherConnectAck(); + expectMasterIquiryReply(); + expectStateChangedCb(someHost, somePort); + savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock); } diff --git a/tst/error_test.cpp b/tst/error_test.cpp index 86e5134..4a345b9 100644 --- a/tst/error_test.cpp +++ b/tst/error_test.cpp @@ -87,6 +87,10 @@ TEST_F(ErrorCodesTest, AllAsyncRedisCommandDispatcherErrorCodesHaveCorrectDescri ec = aec; EXPECT_EQ("redis I/O error", getErrorCodeMessage(ec)); break; + case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE: + ec = aec; + EXPECT_EQ("writing to slave", getErrorCodeMessage(ec)); + break; case AsyncRedisCommandDispatcherErrorCode::END_MARKER: ec = aec; EXPECT_EQ("unsupported error code for message()", getErrorCodeMessage(ec)); @@ -141,6 +145,10 @@ TEST_F(ErrorCodesTest, AllAsyncRedisCommandDispatcherErrorCodesAreMappedToCorrec ec = aec; EXPECT_TRUE(ec == InternalError::BACKEND_ERROR); break; + case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE: + ec = aec; + EXPECT_TRUE(ec == InternalError::BACKEND_ERROR); + break; case AsyncRedisCommandDispatcherErrorCode::END_MARKER: ec = aec; EXPECT_TRUE(ec == InternalError::SDL_ERROR_CODE_LOGIC_ERROR); diff --git a/tst/syncstorageimpl_test.cpp b/tst/syncstorageimpl_test.cpp index 7eab410..aea2346 100644 --- a/tst/syncstorageimpl_test.cpp +++ b/tst/syncstorageimpl_test.cpp @@ -557,6 +557,10 @@ TEST_F(SyncStorageImplTest, AllDispatcherErrorCodesThrowCorrectException) expectModifyIfAck(aec, false); EXPECT_THROW(syncStorage->setIfNotExists(ns, "key1", { 0x0a, 0x0b, 0x0c }), BackendError); break; + case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE: + expectModifyIfAck(aec, false); + EXPECT_THROW(syncStorage->setIfNotExists(ns, "key1", { 0x0a, 0x0b, 0x0c }), BackendError); + break; default: FAIL() << "No mapping for AsyncRedisCommandDispatcherErrorCode value: " << aec; break;