Add Sentinel change notification handling logic 28/1028/1
authorRolf Badorek <rolf.badorek@nokia.com>
Mon, 23 Sep 2019 11:14:56 +0000 (14:14 +0300)
committerRolf Badorek <rolf.badorek@nokia.com>
Fri, 27 Sep 2019 07:39:17 +0000 (10:39 +0300)
When new Redis master has been promoted, Sentinel publishes
notification to '+switch-master' channel.

Refer to below web page for further details:
https://redis.io/topics/sentinel

'AsyncSentinelDatabaseDiscovery' will now subscribe notifications for
above mentioned channel. Notification contains information of new Redis
master, which is parsed from message and sent to upper layer via
'StateChangedCb' callback (if callback is set).

When notifications are subscribed from Redis (from Sentinel this case),
connection will go to "subscribed state", and only some pub/sub related
commands are allowed.

Due the above reason, we have two connections (command dispatchers).
One to subscribe notifications and the other for Sentinel commands,
like master inquiry.

Refer to below web page for further details:
https://redis.io/topics/pubsub

In case that subscriber connection goes down, subscription for
notifications is renewed once connection to Sentinel is working again.
Extra master inquiry will be made because we might be missed
notifications during connection cut. Latest master address is refreshed
via 'StateChangedCb', even if has not changed compared to last informed
address. This could be optimized, but as being very rare situation
was not seen worth of extra logic.

In case that the other connection (used for Sentinel commands) is cut,
the related command dispatcher will re-connect in the background.
Possible Sentinel commands during connection cut will fail and trigger
retry after short delay (per already existing implementation).

If some notifications are missed due some other reason than connection
cut, SDL might go to the state that operations will made to Redis slave.
In this situation write operations will fail with a new internal
'AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE' error code,
which is mapped to 'shareddatalayer::Error::BACKEND_FAILURE'. Recovery
instructions adjusted a bit, so that re-creating SDL API instance is
not optional recovery step (it is the only way to recover from above
mentioned situation currently).

Sentinel support is still disabled, missing implementation will be
added soon as a separate commit(s).

Signed-off-by: Rolf Badorek <rolf.badorek@nokia.com>
Change-Id: I1bb9e121985ee22278e780e50ab13f88acdc65c5

include/private/error.hpp
include/private/redis/asyncsentineldatabasediscovery.hpp
include/sdl/errorqueries.hpp
src/error.cpp
src/redis/asyncsentineldatabasediscovery.cpp
src/redis/redisgeneral.cpp
tst/asyncsentineldatabasediscovery_test.cpp
tst/error_test.cpp
tst/syncstorageimpl_test.cpp

index 6b28f75..70f6b1a 100644 (file)
@@ -67,6 +67,7 @@ namespace shareddatalayer
             DATASET_LOADING,
             NOT_CONNECTED,
             IO_ERROR,
+            WRITING_TO_SLAVE,
             //Keep this always as last item. Used in unit tests to loop all enum values.
             END_MARKER
         };
index 527fa4e..98d1d23 100644 (file)
@@ -44,7 +44,8 @@ namespace shareddatalayer
             using AsyncCommandDispatcherCreator = std::function<std::shared_ptr<redis::AsyncCommandDispatcher>(Engine& engine,
                                                                                                                const redis::DatabaseInfo& databaseInfo,
                                                                                                                std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
-                                                                                                               std::shared_ptr<Logger> logger)>;
+                                                                                                               std::shared_ptr<Logger> logger,
+                                                                                                               bool usePermanentCommandCallbacks)>;
 
             AsyncSentinelDatabaseDiscovery(const AsyncSentinelDatabaseDiscovery&) = delete;
 
@@ -58,7 +59,7 @@ namespace shareddatalayer
                                            const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
                                            std::shared_ptr<redis::ContentsBuilder> contentsBuilder);
 
-            ~AsyncSentinelDatabaseDiscovery() override = default;
+            ~AsyncSentinelDatabaseDiscovery() override;
 
             void setStateChangedCb(const StateChangedCb& stateChangedCb) override;
 
@@ -70,11 +71,18 @@ namespace shareddatalayer
             std::shared_ptr<Logger> logger;
             StateChangedCb stateChangedCb;
             DatabaseInfo databaseInfo;
+            std::shared_ptr<redis::AsyncCommandDispatcher> subscriber;
             std::shared_ptr<redis::AsyncCommandDispatcher> dispatcher;
             std::shared_ptr<redis::ContentsBuilder> contentsBuilder;
+            Timer subscribeRetryTimer;
+            Timer::Duration subscribeRetryTimerDuration;
             Timer masterInquiryRetryTimer;
             Timer::Duration masterInquiryRetryTimerDuration;
 
+            void subscribeNotifications();
+
+            void subscribeAck(const std::error_code& error, const Reply& reply);
+
             void sendMasterInquiry();
 
             void masterInquiryAck(const std::error_code& error, const Reply& reply);
index d8b7e01..5fdfadd 100644 (file)
@@ -62,9 +62,9 @@ namespace shareddatalayer
      * <code>shareddatalayer::Error::BACKEND_FAILURE</code>:<br>
      * shareddatalayer delivered the request to the backend data storage but the backend data storage failed to process the request.
      * In case of a write type request, data in the backend data storage may or may not have been altered.<br>
-     * Client is advised to try the operation again later. Optionally client can also re-create the used shareddatalayer instance.
+     * Client is advised to try the operation again later. If also re-tries are failing client should re-create the used shareddatalayer instance.
      * It is possible that the system does not automatically recover from this type of error situations. Therefore client is advised
-     * to escalate the problem to O&M if operation does not succeed after several retries.<br>
+     * to escalate the problem to O&M if operation does not succeed after above mentioned recovery actions.<br>
      * When shareddatalayer operations work again, client can choose how to best address the possible loss of consistency
      * (see <code>shareddatalayer::Error::OPERATION_INTERRUPTED</code> for possible options).
      *
index 347e651..d6bbaa8 100644 (file)
@@ -109,6 +109,8 @@ namespace
                 return "redis error";
             case AsyncRedisCommandDispatcherErrorCode::IO_ERROR:
                 return "redis I/O error";
+            case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE:
+                return "writing to slave";
             case AsyncRedisCommandDispatcherErrorCode::END_MARKER:
                 logErrorOnce("AsyncRedisCommandDispatcherErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)");
                 return "unsupported error code for message()";
@@ -137,6 +139,8 @@ namespace
                 return InternalError::BACKEND_ERROR;
             case AsyncRedisCommandDispatcherErrorCode::IO_ERROR:
                 return InternalError::BACKEND_ERROR;
+            case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE:
+                return InternalError::BACKEND_ERROR;
             case AsyncRedisCommandDispatcherErrorCode::END_MARKER:
                 logErrorOnce("AsyncRedisCommandDispatcherErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)");
                 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
index 05815e2..be51c5e 100644 (file)
 */
 
 #include <arpa/inet.h>
+#include <boost/algorithm/string.hpp>
 #include <iostream>
 #include <string>
+#include <vector>
 #include <sdl/asyncstorage.hpp>
 #include "private/abort.hpp"
 #include "private/hostandport.hpp"
@@ -34,9 +36,23 @@ namespace
     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
                                                                           const DatabaseInfo& databaseInfo,
                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
-                                                                          std::shared_ptr<Logger> logger);
+                                                                          std::shared_ptr<Logger> logger,
+                                                                          bool usePermanentCommandCallbacks);
+
+    struct SubscribeReply
+    {
+        enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
+        Type type;
+        std::string message;
+
+        SubscribeReply(): type(Type::UNKNOWN) { }
+    };
+
+    std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
 
     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
+
+    std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
 }
 
 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
@@ -60,19 +76,40 @@ AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<E
                      boost::none,
                      DatabaseInfo::Discovery::SENTINEL})),
         contentsBuilder(contentsBuilder),
+        subscribeRetryTimer(*engine),
+        subscribeRetryTimerDuration(std::chrono::seconds(1)),
         masterInquiryRetryTimer(*engine),
         masterInquiryRetryTimerDuration(std::chrono::seconds(1))
 {
+    subscriber = asyncCommandDispatcherCreator(*engine,
+                                               databaseInfo,
+                                               contentsBuilder,
+                                               logger,
+                                               true);
     dispatcher = asyncCommandDispatcherCreator(*engine,
                                                databaseInfo,
                                                contentsBuilder,
-                                               logger);
+                                               logger,
+                                               false);
+}
+
+AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
+{
+    if (subscriber)
+        subscriber->disableCommandCallbacks();
+    if (dispatcher)
+        dispatcher->disableCommandCallbacks();
+    stateChangedCb = nullptr;
 }
 
 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
 {
     stateChangedCb = cb;
-    dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+    subscriber->registerDisconnectCb([this]()
+            {
+                subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
+            });
+    subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
 }
 
 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
@@ -80,13 +117,70 @@ void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
     stateChangedCb = nullptr;
 }
 
+void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
+{
+    subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              "dummyNamespace", // Not meaningful for Sentinel
+                              contentsBuilder->build("SUBSCRIBE", "+switch-master"));
+}
+
+void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
+                                                  const Reply& reply)
+{
+    if (!error)
+    {
+        auto subscribeReply = parseSubscribeReply(reply, *logger);
+        if (subscribeReply)
+        {
+            switch (subscribeReply->type)
+            {
+                case (SubscribeReply::Type::SUBSCRIBE_REPLY):
+                {
+                    dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+                    break;
+                }
+                case (SubscribeReply::Type::NOTIFICATION):
+                {
+                    auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
+                    if (hostAndPort)
+                    {
+                        auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
+                                                       DatabaseInfo::Type::SINGLE,
+                                                       boost::none,
+                                                       DatabaseInfo::Discovery::SENTINEL}));
+                        if (stateChangedCb)
+                            stateChangedCb(databaseInfo);
+                    }
+                    else
+                        SHAREDDATALAYER_ABORT("Notification message parsing error.");
+                    break;
+                }
+                case (SubscribeReply::Type::UNKNOWN):
+                {
+                    logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
+                    SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
+                }
+            }
+        }
+        else
+            SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
+    }
+    else
+        subscribeRetryTimer.arm(
+                subscribeRetryTimerDuration,
+                std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
+}
+
 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
 {
     dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
                                         this,
                                         std::placeholders::_1,
                                         std::placeholders::_2),
-                              "dummyNamespace", // Not meaningful for SENTINEL commands
+                              "dummyNamespace", // Not meaningful for Sentinel
                               contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
 }
 
@@ -121,16 +215,59 @@ namespace
     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
                                                                           const DatabaseInfo& databaseInfo,
                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
-                                                                          std::shared_ptr<Logger> logger)
+                                                                          std::shared_ptr<Logger> logger,
+                                                                          bool usePermanentCommandCallbacks)
     {
         return AsyncCommandDispatcher::create(engine,
                                               databaseInfo,
                                               contentsBuilder,
-                                              false,
+                                              usePermanentCommandCallbacks,
                                               logger,
                                               true);
     }
 
+    std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
+    {
+        // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
+        auto replyType = reply.getType();
+        if (replyType == Reply::Type::ARRAY)
+        {
+            auto& replyVector(*reply.getArray());
+            auto firstElementType = replyVector[0]->getType();
+            if (firstElementType == Reply::Type::STRING)
+            {
+                auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
+                auto kind(replyVector[0]->getString()->str);
+                if (kind == "subscribe")
+                {
+                    subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
+                    return subscribeReply;
+                }
+                else if (kind == "message")
+                {
+                    subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
+                    auto thirdElementType = replyVector[2]->getType();
+                    if (thirdElementType == Reply::Type::STRING)
+                    {
+                        subscribeReply->message = replyVector[2]->getString()->str;
+                        return subscribeReply;
+                    }
+                    else
+                        logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
+                }
+                else
+                    logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
+            }
+            else
+                logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
+                               << static_cast<int>(firstElementType) << std::endl;
+        }
+        else
+            logger.debug() << "Invalid SUBSCRIBE reply type: "
+                           << static_cast<int>(replyType) << std::endl;
+        return nullptr;
+    }
+
     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
     {
         auto replyType = reply.getType();
@@ -169,4 +306,28 @@ namespace
                            << static_cast<int>(replyType) << std::endl;
         return nullptr;
     }
+
+    std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
+    {
+        std::vector<std::string> splittedMessage;
+        boost::split(splittedMessage, message, boost::is_any_of(" "));
+        if (splittedMessage.size() == 5)
+        {
+            auto host = splittedMessage[3];
+            auto port = splittedMessage[4];
+            try
+            {
+                return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
+            }
+            catch (const std::exception& e)
+            {
+                logger.debug() << "Invalid host or port in notification message, host: "
+                               << host << ", port: " << port
+                               << ", exception: " << e.what() << std::endl;
+            }
+        }
+        else
+            logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;
+        return nullptr;
+    }
 }
index eb837ae..f031a77 100644 (file)
@@ -70,6 +70,9 @@ namespace
         if (startsWith("ERR Protocol error", rr->str, static_cast<size_t>(rr->len)))
             return AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR;
 
+        if (startsWith("READONLY", rr->str, static_cast<size_t>(rr->len)))
+            return AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE;
+
         std::ostringstream oss;
         oss << "redis reply error: " << std::string(rr->str, static_cast<size_t>(rr->len));
         logErrorOnce(oss.str());
index dcf35c7..ab48284 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <gtest/gtest.h>
 #include <arpa/inet.h>
+#include <string>
 #include <sdl/asyncstorage.hpp>
 #include "private/createlogger.hpp"
 #include "private/hostandport.hpp"
@@ -39,60 +40,131 @@ namespace
     public:
         std::unique_ptr<AsyncSentinelDatabaseDiscovery> asyncSentinelDatabaseDiscovery;
         std::shared_ptr<StrictMock<EngineMock>> engineMock;
+        std::shared_ptr<StrictMock<AsyncCommandDispatcherMock>> subscriberMock;
         std::shared_ptr<StrictMock<AsyncCommandDispatcherMock>> dispatcherMock;
         std::shared_ptr<StrictMock<ContentsBuilderMock>> contentsBuilderMock;
         std::shared_ptr<Logger> logger;
         Contents contents;
+        AsyncCommandDispatcher::ConnectAck subscriberConnectAck;
+        AsyncCommandDispatcher::DisconnectCb subscriberDisconnectCb;
         AsyncCommandDispatcher::ConnectAck dispatcherConnectAck;
-        AsyncCommandDispatcher::CommandCb savedCommandCb;
-        ReplyMock replyMock;
+        AsyncCommandDispatcher::CommandCb savedSubscriberCommandCb;
+        AsyncCommandDispatcher::CommandCb savedDispatcherCommandCb;
+        ReplyMock masterInquiryReplyMock;
         std::string someHost;
         uint16_t somePort;
+        std::string someOtherHost;
+        uint16_t someOtherPort;
         Reply::DataItem hostDataItem;
         Reply::DataItem portDataItem;
         std::shared_ptr<ReplyMock> masterInquiryReplyHost;
         std::shared_ptr<ReplyMock> masterInquiryReplyPort;
         Reply::ReplyVector masterInquiryReply;
         Timer::Duration expectedMasterInquiryRetryTimerDuration;
-        Timer::Callback savedConnectionRetryTimerCallback;
+        Timer::Callback savedMasterInquiryRetryTimerCallback;
+        // Mocks for SUBSCRIBE command replies are a bit complicated, because reply might have several
+        // meanings/structures: https://redis.io/topics/pubsub#format-of-pushed-messages
+        ReplyMock subscribeReplyMock;
+        std::shared_ptr<ReplyMock> subscribeReplyArrayElement0;
+        std::shared_ptr<ReplyMock> subscribeReplyArrayElement1;
+        std::shared_ptr<ReplyMock> subscribeReplyArrayElement2;
+        Reply::ReplyVector subscribeReplyVector;
+        Reply::DataItem subscribeDataItem;
+        ReplyMock notificationReplyMock;
+        std::shared_ptr<ReplyMock> notificationReplyArrayElement0;
+        std::shared_ptr<ReplyMock> notificationReplyArrayElement1;
+        std::shared_ptr<ReplyMock> notificationReplyArrayElement2;
+        Reply::ReplyVector notificationReplyVector;
+        Reply::DataItem notificationDataItem;
+        std::string notificationMessage;
+        Reply::DataItem notificationMessageDataItem;
+        Timer::Duration expectedSubscribeRetryTimerDuration;
+        Timer::Callback savedSubscribeRetryTimerCallback;
 
         AsyncSentinelDatabaseDiscoveryBaseTest():
             engineMock(std::make_shared<StrictMock<EngineMock>>()),
-            dispatcherMock(std::make_shared<StrictMock<AsyncCommandDispatcherMock>>()),
             contentsBuilderMock(std::make_shared<StrictMock<ContentsBuilderMock>>(AsyncStorage::SEPARATOR)),
             logger(createLogger(SDL_LOG_PREFIX)),
             contents({{"aaa","bbb"},{3,3}}),
             someHost("somehost"),
             somePort(1234),
+            someOtherHost("someotherhost"),
+            someOtherPort(5678),
             hostDataItem({someHost,ReplyStringLength(someHost.length())}),
             portDataItem({std::to_string(somePort),ReplyStringLength(std::to_string(somePort).length())}),
             masterInquiryReplyHost(std::make_shared<ReplyMock>()),
             masterInquiryReplyPort(std::make_shared<ReplyMock>()),
-            expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1))
+            expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1)),
+            subscribeReplyArrayElement0(std::make_shared<ReplyMock>()),
+            subscribeReplyArrayElement1(std::make_shared<ReplyMock>()),
+            subscribeReplyArrayElement2(std::make_shared<ReplyMock>()),
+            subscribeDataItem({"subscribe",9}),
+            notificationReplyArrayElement0(std::make_shared<ReplyMock>()),
+            notificationReplyArrayElement1(std::make_shared<ReplyMock>()),
+            notificationReplyArrayElement2(std::make_shared<ReplyMock>()),
+            notificationDataItem({"message",7}),
+            notificationMessage("mymaster " + someHost + " " + std::to_string(somePort) + " " + someOtherHost + " " + std::to_string(someOtherPort)),
+            notificationMessageDataItem({notificationMessage, ReplyStringLength(notificationMessage.length())}),
+            expectedSubscribeRetryTimerDuration(std::chrono::seconds(1))
         {
             masterInquiryReply.push_back(masterInquiryReplyHost);
             masterInquiryReply.push_back(masterInquiryReplyPort);
+            subscribeReplyVector.push_back(subscribeReplyArrayElement0);
+            subscribeReplyVector.push_back(subscribeReplyArrayElement1);
+            subscribeReplyVector.push_back(subscribeReplyArrayElement2);
+            notificationReplyVector.push_back(notificationReplyArrayElement0);
+            notificationReplyVector.push_back(notificationReplyArrayElement1);
+            notificationReplyVector.push_back(notificationReplyArrayElement2);
         }
 
         virtual ~AsyncSentinelDatabaseDiscoveryBaseTest()
         {
         }
 
-        std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine&,
-                                                                              const DatabaseInfo&,
-                                                                              std::shared_ptr<ContentsBuilder>)
+        std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator()
         {
             // @TODO Add database info checking when configuration support for sentinel is added.
-            newDispatcherCreated();
-            return dispatcherMock;
+            if (!subscriberMock)
+            {
+                subscriberMock = std::make_shared<StrictMock<AsyncCommandDispatcherMock>>();
+                newDispatcherCreated();
+                return subscriberMock;
+            }
+            if (!dispatcherMock)
+            {
+                dispatcherMock = std::make_shared<StrictMock<AsyncCommandDispatcherMock>>();
+                newDispatcherCreated();
+                return dispatcherMock;
+            }
+            return nullptr;
         }
 
         MOCK_METHOD0(newDispatcherCreated, void());
 
-        void expectNewDispatcherCreated()
+        void expectDispatchersCreated()
         {
             EXPECT_CALL(*this, newDispatcherCreated())
-                .Times(1);
+                .Times(2);
+        }
+
+        void expectSubscriberWaitConnectedAsync()
+        {
+            EXPECT_CALL(*subscriberMock, waitConnectedAsync(_))
+                .Times(1)
+                .WillOnce(Invoke([this](const AsyncCommandDispatcher::ConnectAck& connectAck)
+                        {
+                            subscriberConnectAck = connectAck;
+                        }));
+        }
+
+        void expectSubscriberRegisterDisconnectCb()
+        {
+            EXPECT_CALL(*subscriberMock, registerDisconnectCb(_))
+                .Times(1)
+                .WillOnce(Invoke([this](const AsyncCommandDispatcher::DisconnectCb& disconnectCb)
+                        {
+                            subscriberDisconnectCb = disconnectCb;
+                        }));
         }
 
         void expectDispatcherWaitConnectedAsync()
@@ -105,6 +177,14 @@ namespace
                         }));
         }
 
+        void expectContentsBuild(const std::string& string,
+                                 const std::string& string2)
+        {
+            EXPECT_CALL(*contentsBuilderMock, build(string, string2))
+                .Times(1)
+                .WillOnce(Return(contents));
+        }
+
         void expectContentsBuild(const std::string& string,
                                  const std::string& string2,
                                  const std::string& string3)
@@ -114,28 +194,41 @@ namespace
                 .WillOnce(Return(contents));
         }
 
-        void expectDispatchAsync()
+        void expectSubscriberDispatchAsync()
+        {
+            EXPECT_CALL(*subscriberMock, dispatchAsync(_, _, contents))
+                .Times(1)
+                .WillOnce(SaveArg<0>(&savedSubscriberCommandCb));
+        }
+
+        void expectDispatcherDispatchAsync()
         {
             EXPECT_CALL(*dispatcherMock, dispatchAsync(_, _, contents))
                 .Times(1)
-                .WillOnce(SaveArg<0>(&savedCommandCb));
+                .WillOnce(SaveArg<0>(&savedDispatcherCommandCb));
+        }
+
+        void expectSubscribeNotifications()
+        {
+            expectContentsBuild("SUBSCRIBE", "+switch-master");
+            expectSubscriberDispatchAsync();
         }
 
         void expectMasterInquiry()
         {
             expectContentsBuild("SENTINEL", "get-master-addr-by-name", "mymaster");
-            expectDispatchAsync();
+            expectDispatcherDispatchAsync();
         }
 
         MOCK_METHOD1(stateChangedCb, void(const DatabaseInfo&));
 
-        void expectStateChangedCb()
+        void expectStateChangedCb(const std::string& host, uint16_t port)
         {
             EXPECT_CALL(*this, stateChangedCb(_))
                 .Times(1)
-                .WillOnce(Invoke([this](const DatabaseInfo& databaseInfo)
+                .WillOnce(Invoke([this, host, port](const DatabaseInfo& databaseInfo)
                                  {
-                                     EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(someHost, htons(somePort)) }),
+                                     EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(host, htons(port)) }),
                                                  ContainerEq(databaseInfo.hosts));
                                      EXPECT_EQ(DatabaseInfo::Type::SINGLE, databaseInfo.type);
                                      EXPECT_EQ(boost::none, databaseInfo.ns);
@@ -143,49 +236,50 @@ namespace
                                  }));
         }
 
-        void expectGetReplyType(ReplyMock& mock, const Reply::Type& type)
+        void expectMasterIquiryReply()
         {
-            EXPECT_CALL(mock, getType())
-                .Times(1)
-                .WillOnce(Return(type));
+            expectGetType(masterInquiryReplyMock, Reply::Type::ARRAY);
+            expectGetArray(masterInquiryReplyMock, masterInquiryReply);
+            expectGetType(*masterInquiryReplyHost, Reply::Type::STRING);
+            expectGetString(*masterInquiryReplyHost, hostDataItem);
+            expectGetType(*masterInquiryReplyPort, Reply::Type::STRING);
+            expectGetString(*masterInquiryReplyPort, portDataItem);
         }
 
-        void expectGetReplyArray_ReturnMasterInquiryReply()
+        void expectMasterInquiryRetryTimer()
         {
-            EXPECT_CALL(replyMock, getArray())
+            EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _))
                 .Times(1)
-                .WillOnce(Return(&masterInquiryReply));
+                .WillOnce(SaveArg<2>(&savedMasterInquiryRetryTimerCallback));
         }
 
-        void expectGetReplyString(ReplyMock& mock, const Reply::DataItem& item)
+        void expectSubscribeRetryTimer()
         {
-            EXPECT_CALL(mock, getString())
+            EXPECT_CALL(*engineMock, armTimer(_, expectedSubscribeRetryTimerDuration, _))
                 .Times(1)
-                .WillOnce(Return(&item));
+                .WillOnce(SaveArg<2>(&savedSubscribeRetryTimerCallback));
         }
 
-        void expectMasterIquiryReply()
+        void setStateChangedCbExpectsBeforeMasterInquiry()
         {
-            expectGetReplyType(replyMock, Reply::Type::ARRAY);
-            expectGetReplyArray_ReturnMasterInquiryReply();
-            expectGetReplyType(*masterInquiryReplyHost, Reply::Type::STRING);
-            expectGetReplyString(*masterInquiryReplyHost, hostDataItem);
-            expectGetReplyType(*masterInquiryReplyPort, Reply::Type::STRING);
-            expectGetReplyString(*masterInquiryReplyPort, portDataItem);
-        }
-
-        void expectMasterInquiryRetryTimer()
-        {
-            EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _))
-                .Times(1)
-                .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
+            expectSubscriberRegisterDisconnectCb();
+            expectSubscriberWaitConnectedAsync();
+            asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::stateChangedCb,
+                    this,
+                    std::placeholders::_1));
+            expectSubscribeNotifications();
+            subscriberConnectAck();
+            expectSubscribeReply();
+            expectDispatcherWaitConnectedAsync();
+            savedSubscriberCommandCb(std::error_code(), subscribeReplyMock);
+            expectMasterInquiry();
         }
 
         void setDefaultResponsesForMasterInquiryReplyParsing()
         {
-            ON_CALL(replyMock, getType())
+            ON_CALL(masterInquiryReplyMock, getType())
                 .WillByDefault(Return(Reply::Type::ARRAY));
-            ON_CALL(replyMock, getArray())
+            ON_CALL(masterInquiryReplyMock, getArray())
                 .WillByDefault(Return(&masterInquiryReply));
             ON_CALL(*masterInquiryReplyHost, getType())
                 .WillByDefault(Return(Reply::Type::STRING));
@@ -196,6 +290,68 @@ namespace
             ON_CALL(*masterInquiryReplyHost, getString())
                 .WillByDefault(Return(&portDataItem));
         }
+
+        void expectGetType(ReplyMock& mock, const Reply::Type& type)
+        {
+            EXPECT_CALL(mock, getType())
+                .Times(1)
+                .WillOnce(Return(type));
+        }
+
+        void expectGetString(ReplyMock& mock, const Reply::DataItem& item)
+        {
+            EXPECT_CALL(mock, getString())
+                .Times(1)
+                .WillOnce(Return(&item));
+        }
+
+        void expectGetInteger(ReplyMock& mock, int value)
+        {
+            EXPECT_CALL(mock, getInteger())
+                .Times(1)
+                .WillOnce(Return(value));
+        }
+
+        void expectGetArray(ReplyMock& mock, Reply::ReplyVector& replyVector)
+        {
+            EXPECT_CALL(mock, getArray())
+                .Times(1)
+                .WillOnce(Return(&replyVector));
+        }
+
+        void expectSubscribeReply()
+        {
+            expectGetType(subscribeReplyMock, Reply::Type::ARRAY);
+            expectGetArray(subscribeReplyMock, subscribeReplyVector);
+            expectGetType(*subscribeReplyArrayElement0, Reply::Type::STRING);
+            expectGetString(*subscribeReplyArrayElement0, subscribeDataItem);
+        }
+
+        void expectNotificationReply()
+        {
+            expectGetType(notificationReplyMock, Reply::Type::ARRAY);
+            expectGetArray(notificationReplyMock, notificationReplyVector);
+            expectGetType(*notificationReplyArrayElement0, Reply::Type::STRING);
+            expectGetString(*notificationReplyArrayElement0, notificationDataItem);
+            expectGetType(*notificationReplyArrayElement2, Reply::Type::STRING);
+            expectGetString(*notificationReplyArrayElement2, notificationMessageDataItem);
+        }
+
+        void setDefaultResponsesForNotificationReplyParsing()
+        {
+            ON_CALL(notificationReplyMock, getType())
+                .WillByDefault(Return(Reply::Type::ARRAY));
+            ON_CALL(notificationReplyMock, getArray())
+                .WillByDefault(Return(&notificationReplyVector));
+            ON_CALL(*notificationReplyArrayElement0, getType())
+                .WillByDefault(Return(Reply::Type::STRING));
+            ON_CALL(*notificationReplyArrayElement0, getString())
+                .WillByDefault(Return(&notificationDataItem));
+            ON_CALL(*notificationReplyArrayElement2, getType())
+                .WillByDefault(Return(Reply::Type::STRING));
+            ON_CALL(*notificationReplyArrayElement2, getString())
+                .WillByDefault(Return(&notificationMessageDataItem));
+        }
     };
 
     class AsyncSentinelDatabaseDiscoveryTest: public AsyncSentinelDatabaseDiscoveryBaseTest
@@ -203,21 +359,42 @@ namespace
     public:
         AsyncSentinelDatabaseDiscoveryTest()
         {
-            expectNewDispatcherCreated();
+            expectDispatchersCreated();
             asyncSentinelDatabaseDiscovery.reset(
                     new AsyncSentinelDatabaseDiscovery(
                             engineMock,
                             logger,
                             std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::asyncCommandDispatcherCreator,
-                                      this,
-                                      std::placeholders::_1,
-                                      std::placeholders::_2,
-                                      std::placeholders::_3),
+                                      this),
                             contentsBuilderMock));
         }
+
+        ~AsyncSentinelDatabaseDiscoveryTest()
+        {
+            EXPECT_CALL(*subscriberMock, disableCommandCallbacks())
+                .Times(1);
+            EXPECT_CALL(*dispatcherMock, disableCommandCallbacks())
+                .Times(1);
+        }
+    };
+
+    class AsyncSentinelDatabaseDiscoveryInListeningModeTest: public AsyncSentinelDatabaseDiscoveryTest
+    {
+    public:
+        AsyncSentinelDatabaseDiscoveryInListeningModeTest()
+        {
+            InSequence dummy;
+            setStateChangedCbExpectsBeforeMasterInquiry();
+            dispatcherConnectAck();
+            expectMasterIquiryReply();
+            expectStateChangedCb(someHost, somePort);
+            savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+        }
     };
 
     using AsyncSentinelDatabaseDiscoveryDeathTest = AsyncSentinelDatabaseDiscoveryTest;
+
+    using AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest = AsyncSentinelDatabaseDiscoveryInListeningModeTest;
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, IsNotCopyable)
@@ -233,115 +410,185 @@ TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, ImplementsAsyncDatabaseDiscovery)
     EXPECT_TRUE((std::is_base_of<AsyncDatabaseDiscovery, AsyncSentinelDatabaseDiscovery>::value));
 }
 
-TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterIsInquiredFromSentinel)
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, SettingChangedCallbackTriggersSentinelNotificationsSubscriptionAndMasterInquiry)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     expectMasterIquiryReply();
-    expectStateChangedCb();
-    savedCommandCb(std::error_code(), replyMock);
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
 }
 
-TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterInquiryErrorTriggersRetry)
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, MasterInquiryErrorTriggersRetry)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     expectMasterInquiryRetryTimer();
-    savedCommandCb(getWellKnownErrorCode(), replyMock);
+    savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock);
     expectMasterInquiry();
-    savedConnectionRetryTimerCallback();
+    savedMasterInquiryRetryTimerCallback();
     expectMasterIquiryReply();
-    expectStateChangedCb();
-    savedCommandCb(std::error_code(), replyMock);
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidReplyType)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
-    ON_CALL(replyMock, getType())
+    ON_CALL(masterInquiryReplyMock, getType())
         .WillByDefault(Return(Reply::Type::NIL));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidHostElementType)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     setDefaultResponsesForMasterInquiryReplyParsing();
     ON_CALL(*masterInquiryReplyHost, getType())
         .WillByDefault(Return(Reply::Type::NIL));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidPortElementType)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     setDefaultResponsesForMasterInquiryReplyParsing();
     ON_CALL(*masterInquiryReplyPort, getType())
         .WillByDefault(Return(Reply::Type::NIL));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_PortCantBeCastedToInt)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     setDefaultResponsesForMasterInquiryReplyParsing();
     std::string invalidPort("invalidPort");
     Reply::DataItem invalidPortDataItem({invalidPort,ReplyStringLength(invalidPort.length())});
     ON_CALL(*masterInquiryReplyPort, getString())
         .WillByDefault(Return(&invalidPortDataItem));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryTest, CallbackIsNotCalledAfterCleared)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     expectMasterInquiryRetryTimer();
-    savedCommandCb(getWellKnownErrorCode(), replyMock);
+    savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock);
     expectMasterInquiry();
-    savedConnectionRetryTimerCallback();
+    savedMasterInquiryRetryTimerCallback();
     expectMasterIquiryReply();
     asyncSentinelDatabaseDiscovery->clearStateChangedCb();
     EXPECT_CALL(*this, stateChangedCb(_))
         .Times(0);
-    savedCommandCb(std::error_code(), replyMock);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, ChangeNotificationFromSentinel)
+{
+    InSequence dummy;
+    setStateChangedCbExpectsBeforeMasterInquiry();
+    dispatcherConnectAck();
+    expectMasterIquiryReply();
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+    expectNotificationReply();
+    expectStateChangedCb(someOtherHost, someOtherPort);
+    savedSubscriberCommandCb(std::error_code(), notificationReplyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscribeCommandErrorTriggersRetry)
+{
+    InSequence dummy;
+    expectSubscribeRetryTimer();
+    savedSubscriberCommandCb(getWellKnownErrorCode(), subscribeReplyMock);
+    expectSubscribeNotifications();
+    savedSubscribeRetryTimerCallback();
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidReplyType)
+{
+    InSequence dummy;
+    ON_CALL(notificationReplyMock, getType())
+        .WillByDefault(Return(Reply::Type::NIL));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKindElementType)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    ON_CALL(*notificationReplyArrayElement0, getType())
+        .WillByDefault(Return(Reply::Type::NIL));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKind)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    std::string invalidKind("invalidKind");
+    Reply::DataItem invalidKindDataItem({invalidKind,ReplyStringLength(invalidKind.length())});
+    ON_CALL(*notificationReplyArrayElement0, getString())
+        .WillByDefault(Return(&invalidKindDataItem));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageElementType)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    ON_CALL(*notificationReplyArrayElement2, getType())
+        .WillByDefault(Return(Reply::Type::NIL));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageStructure)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    std::string invalidMessage("mymaster oldHost 1234 5678");
+    auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())}));
+    ON_CALL(*notificationReplyArrayElement2, getString())
+        .WillByDefault(Return(&invalidMessageDataItem));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidPort)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    std::string invalidMessage("mymaster oldHost 1234 newHost invalidPort");
+    auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())}));
+    ON_CALL(*notificationReplyArrayElement2, getString())
+        .WillByDefault(Return(&invalidMessageDataItem));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscriberDisconnectCallbackTriggersSubscriptionRenewal)
+{
+    InSequence dummy;
+    expectSubscriberWaitConnectedAsync();
+    subscriberDisconnectCb();
+    expectSubscribeNotifications();
+    subscriberConnectAck();
+    expectSubscribeReply();
+    expectDispatcherWaitConnectedAsync();
+    savedSubscriberCommandCb(std::error_code(), subscribeReplyMock);
+    expectMasterInquiry();
+    dispatcherConnectAck();
+    expectMasterIquiryReply();
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
 }
index 86e5134..4a345b9 100644 (file)
@@ -87,6 +87,10 @@ TEST_F(ErrorCodesTest, AllAsyncRedisCommandDispatcherErrorCodesHaveCorrectDescri
                 ec = aec;
                 EXPECT_EQ("redis I/O error", getErrorCodeMessage(ec));
                 break;
+            case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE:
+                ec = aec;
+                EXPECT_EQ("writing to slave", getErrorCodeMessage(ec));
+                break;
             case AsyncRedisCommandDispatcherErrorCode::END_MARKER:
                 ec = aec;
                 EXPECT_EQ("unsupported error code for message()", getErrorCodeMessage(ec));
@@ -141,6 +145,10 @@ TEST_F(ErrorCodesTest, AllAsyncRedisCommandDispatcherErrorCodesAreMappedToCorrec
                 ec = aec;
                 EXPECT_TRUE(ec == InternalError::BACKEND_ERROR);
                 break;
+            case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE:
+                ec = aec;
+                EXPECT_TRUE(ec == InternalError::BACKEND_ERROR);
+                break;
             case AsyncRedisCommandDispatcherErrorCode::END_MARKER:
                 ec = aec;
                 EXPECT_TRUE(ec == InternalError::SDL_ERROR_CODE_LOGIC_ERROR);
index 7eab410..aea2346 100644 (file)
@@ -557,6 +557,10 @@ TEST_F(SyncStorageImplTest, AllDispatcherErrorCodesThrowCorrectException)
                 expectModifyIfAck(aec, false);
                 EXPECT_THROW(syncStorage->setIfNotExists(ns, "key1", { 0x0a, 0x0b, 0x0c }), BackendError);
                 break;
+            case AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE:
+                expectModifyIfAck(aec, false);
+                EXPECT_THROW(syncStorage->setIfNotExists(ns, "key1", { 0x0a, 0x0b, 0x0c }), BackendError);
+                break;
             default:
                 FAIL() << "No mapping for AsyncRedisCommandDispatcherErrorCode value: " << aec;
                 break;