Add Sentinel change notification handling logic
[ric-plt/sdl.git] / tst / asyncsentineldatabasediscovery_test.cpp
index dcf35c7..ab48284 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <gtest/gtest.h>
 #include <arpa/inet.h>
+#include <string>
 #include <sdl/asyncstorage.hpp>
 #include "private/createlogger.hpp"
 #include "private/hostandport.hpp"
@@ -39,60 +40,131 @@ namespace
     public:
         std::unique_ptr<AsyncSentinelDatabaseDiscovery> asyncSentinelDatabaseDiscovery;
         std::shared_ptr<StrictMock<EngineMock>> engineMock;
+        std::shared_ptr<StrictMock<AsyncCommandDispatcherMock>> subscriberMock;
         std::shared_ptr<StrictMock<AsyncCommandDispatcherMock>> dispatcherMock;
         std::shared_ptr<StrictMock<ContentsBuilderMock>> contentsBuilderMock;
         std::shared_ptr<Logger> logger;
         Contents contents;
+        AsyncCommandDispatcher::ConnectAck subscriberConnectAck;
+        AsyncCommandDispatcher::DisconnectCb subscriberDisconnectCb;
         AsyncCommandDispatcher::ConnectAck dispatcherConnectAck;
-        AsyncCommandDispatcher::CommandCb savedCommandCb;
-        ReplyMock replyMock;
+        AsyncCommandDispatcher::CommandCb savedSubscriberCommandCb;
+        AsyncCommandDispatcher::CommandCb savedDispatcherCommandCb;
+        ReplyMock masterInquiryReplyMock;
         std::string someHost;
         uint16_t somePort;
+        std::string someOtherHost;
+        uint16_t someOtherPort;
         Reply::DataItem hostDataItem;
         Reply::DataItem portDataItem;
         std::shared_ptr<ReplyMock> masterInquiryReplyHost;
         std::shared_ptr<ReplyMock> masterInquiryReplyPort;
         Reply::ReplyVector masterInquiryReply;
         Timer::Duration expectedMasterInquiryRetryTimerDuration;
-        Timer::Callback savedConnectionRetryTimerCallback;
+        Timer::Callback savedMasterInquiryRetryTimerCallback;
+        // Mocks for SUBSCRIBE command replies are a bit complicated, because reply might have several
+        // meanings/structures: https://redis.io/topics/pubsub#format-of-pushed-messages
+        ReplyMock subscribeReplyMock;
+        std::shared_ptr<ReplyMock> subscribeReplyArrayElement0;
+        std::shared_ptr<ReplyMock> subscribeReplyArrayElement1;
+        std::shared_ptr<ReplyMock> subscribeReplyArrayElement2;
+        Reply::ReplyVector subscribeReplyVector;
+        Reply::DataItem subscribeDataItem;
+        ReplyMock notificationReplyMock;
+        std::shared_ptr<ReplyMock> notificationReplyArrayElement0;
+        std::shared_ptr<ReplyMock> notificationReplyArrayElement1;
+        std::shared_ptr<ReplyMock> notificationReplyArrayElement2;
+        Reply::ReplyVector notificationReplyVector;
+        Reply::DataItem notificationDataItem;
+        std::string notificationMessage;
+        Reply::DataItem notificationMessageDataItem;
+        Timer::Duration expectedSubscribeRetryTimerDuration;
+        Timer::Callback savedSubscribeRetryTimerCallback;
 
         AsyncSentinelDatabaseDiscoveryBaseTest():
             engineMock(std::make_shared<StrictMock<EngineMock>>()),
-            dispatcherMock(std::make_shared<StrictMock<AsyncCommandDispatcherMock>>()),
             contentsBuilderMock(std::make_shared<StrictMock<ContentsBuilderMock>>(AsyncStorage::SEPARATOR)),
             logger(createLogger(SDL_LOG_PREFIX)),
             contents({{"aaa","bbb"},{3,3}}),
             someHost("somehost"),
             somePort(1234),
+            someOtherHost("someotherhost"),
+            someOtherPort(5678),
             hostDataItem({someHost,ReplyStringLength(someHost.length())}),
             portDataItem({std::to_string(somePort),ReplyStringLength(std::to_string(somePort).length())}),
             masterInquiryReplyHost(std::make_shared<ReplyMock>()),
             masterInquiryReplyPort(std::make_shared<ReplyMock>()),
-            expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1))
+            expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1)),
+            subscribeReplyArrayElement0(std::make_shared<ReplyMock>()),
+            subscribeReplyArrayElement1(std::make_shared<ReplyMock>()),
+            subscribeReplyArrayElement2(std::make_shared<ReplyMock>()),
+            subscribeDataItem({"subscribe",9}),
+            notificationReplyArrayElement0(std::make_shared<ReplyMock>()),
+            notificationReplyArrayElement1(std::make_shared<ReplyMock>()),
+            notificationReplyArrayElement2(std::make_shared<ReplyMock>()),
+            notificationDataItem({"message",7}),
+            notificationMessage("mymaster " + someHost + " " + std::to_string(somePort) + " " + someOtherHost + " " + std::to_string(someOtherPort)),
+            notificationMessageDataItem({notificationMessage, ReplyStringLength(notificationMessage.length())}),
+            expectedSubscribeRetryTimerDuration(std::chrono::seconds(1))
         {
             masterInquiryReply.push_back(masterInquiryReplyHost);
             masterInquiryReply.push_back(masterInquiryReplyPort);
+            subscribeReplyVector.push_back(subscribeReplyArrayElement0);
+            subscribeReplyVector.push_back(subscribeReplyArrayElement1);
+            subscribeReplyVector.push_back(subscribeReplyArrayElement2);
+            notificationReplyVector.push_back(notificationReplyArrayElement0);
+            notificationReplyVector.push_back(notificationReplyArrayElement1);
+            notificationReplyVector.push_back(notificationReplyArrayElement2);
         }
 
         virtual ~AsyncSentinelDatabaseDiscoveryBaseTest()
         {
         }
 
-        std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine&,
-                                                                              const DatabaseInfo&,
-                                                                              std::shared_ptr<ContentsBuilder>)
+        std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator()
         {
             // @TODO Add database info checking when configuration support for sentinel is added.
-            newDispatcherCreated();
-            return dispatcherMock;
+            if (!subscriberMock)
+            {
+                subscriberMock = std::make_shared<StrictMock<AsyncCommandDispatcherMock>>();
+                newDispatcherCreated();
+                return subscriberMock;
+            }
+            if (!dispatcherMock)
+            {
+                dispatcherMock = std::make_shared<StrictMock<AsyncCommandDispatcherMock>>();
+                newDispatcherCreated();
+                return dispatcherMock;
+            }
+            return nullptr;
         }
 
         MOCK_METHOD0(newDispatcherCreated, void());
 
-        void expectNewDispatcherCreated()
+        void expectDispatchersCreated()
         {
             EXPECT_CALL(*this, newDispatcherCreated())
-                .Times(1);
+                .Times(2);
+        }
+
+        void expectSubscriberWaitConnectedAsync()
+        {
+            EXPECT_CALL(*subscriberMock, waitConnectedAsync(_))
+                .Times(1)
+                .WillOnce(Invoke([this](const AsyncCommandDispatcher::ConnectAck& connectAck)
+                        {
+                            subscriberConnectAck = connectAck;
+                        }));
+        }
+
+        void expectSubscriberRegisterDisconnectCb()
+        {
+            EXPECT_CALL(*subscriberMock, registerDisconnectCb(_))
+                .Times(1)
+                .WillOnce(Invoke([this](const AsyncCommandDispatcher::DisconnectCb& disconnectCb)
+                        {
+                            subscriberDisconnectCb = disconnectCb;
+                        }));
         }
 
         void expectDispatcherWaitConnectedAsync()
@@ -105,6 +177,14 @@ namespace
                         }));
         }
 
+        void expectContentsBuild(const std::string& string,
+                                 const std::string& string2)
+        {
+            EXPECT_CALL(*contentsBuilderMock, build(string, string2))
+                .Times(1)
+                .WillOnce(Return(contents));
+        }
+
         void expectContentsBuild(const std::string& string,
                                  const std::string& string2,
                                  const std::string& string3)
@@ -114,28 +194,41 @@ namespace
                 .WillOnce(Return(contents));
         }
 
-        void expectDispatchAsync()
+        void expectSubscriberDispatchAsync()
+        {
+            EXPECT_CALL(*subscriberMock, dispatchAsync(_, _, contents))
+                .Times(1)
+                .WillOnce(SaveArg<0>(&savedSubscriberCommandCb));
+        }
+
+        void expectDispatcherDispatchAsync()
         {
             EXPECT_CALL(*dispatcherMock, dispatchAsync(_, _, contents))
                 .Times(1)
-                .WillOnce(SaveArg<0>(&savedCommandCb));
+                .WillOnce(SaveArg<0>(&savedDispatcherCommandCb));
+        }
+
+        void expectSubscribeNotifications()
+        {
+            expectContentsBuild("SUBSCRIBE", "+switch-master");
+            expectSubscriberDispatchAsync();
         }
 
         void expectMasterInquiry()
         {
             expectContentsBuild("SENTINEL", "get-master-addr-by-name", "mymaster");
-            expectDispatchAsync();
+            expectDispatcherDispatchAsync();
         }
 
         MOCK_METHOD1(stateChangedCb, void(const DatabaseInfo&));
 
-        void expectStateChangedCb()
+        void expectStateChangedCb(const std::string& host, uint16_t port)
         {
             EXPECT_CALL(*this, stateChangedCb(_))
                 .Times(1)
-                .WillOnce(Invoke([this](const DatabaseInfo& databaseInfo)
+                .WillOnce(Invoke([this, host, port](const DatabaseInfo& databaseInfo)
                                  {
-                                     EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(someHost, htons(somePort)) }),
+                                     EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(host, htons(port)) }),
                                                  ContainerEq(databaseInfo.hosts));
                                      EXPECT_EQ(DatabaseInfo::Type::SINGLE, databaseInfo.type);
                                      EXPECT_EQ(boost::none, databaseInfo.ns);
@@ -143,49 +236,50 @@ namespace
                                  }));
         }
 
-        void expectGetReplyType(ReplyMock& mock, const Reply::Type& type)
+        void expectMasterIquiryReply()
         {
-            EXPECT_CALL(mock, getType())
-                .Times(1)
-                .WillOnce(Return(type));
+            expectGetType(masterInquiryReplyMock, Reply::Type::ARRAY);
+            expectGetArray(masterInquiryReplyMock, masterInquiryReply);
+            expectGetType(*masterInquiryReplyHost, Reply::Type::STRING);
+            expectGetString(*masterInquiryReplyHost, hostDataItem);
+            expectGetType(*masterInquiryReplyPort, Reply::Type::STRING);
+            expectGetString(*masterInquiryReplyPort, portDataItem);
         }
 
-        void expectGetReplyArray_ReturnMasterInquiryReply()
+        void expectMasterInquiryRetryTimer()
         {
-            EXPECT_CALL(replyMock, getArray())
+            EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _))
                 .Times(1)
-                .WillOnce(Return(&masterInquiryReply));
+                .WillOnce(SaveArg<2>(&savedMasterInquiryRetryTimerCallback));
         }
 
-        void expectGetReplyString(ReplyMock& mock, const Reply::DataItem& item)
+        void expectSubscribeRetryTimer()
         {
-            EXPECT_CALL(mock, getString())
+            EXPECT_CALL(*engineMock, armTimer(_, expectedSubscribeRetryTimerDuration, _))
                 .Times(1)
-                .WillOnce(Return(&item));
+                .WillOnce(SaveArg<2>(&savedSubscribeRetryTimerCallback));
         }
 
-        void expectMasterIquiryReply()
+        void setStateChangedCbExpectsBeforeMasterInquiry()
         {
-            expectGetReplyType(replyMock, Reply::Type::ARRAY);
-            expectGetReplyArray_ReturnMasterInquiryReply();
-            expectGetReplyType(*masterInquiryReplyHost, Reply::Type::STRING);
-            expectGetReplyString(*masterInquiryReplyHost, hostDataItem);
-            expectGetReplyType(*masterInquiryReplyPort, Reply::Type::STRING);
-            expectGetReplyString(*masterInquiryReplyPort, portDataItem);
-        }
-
-        void expectMasterInquiryRetryTimer()
-        {
-            EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _))
-                .Times(1)
-                .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
+            expectSubscriberRegisterDisconnectCb();
+            expectSubscriberWaitConnectedAsync();
+            asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::stateChangedCb,
+                    this,
+                    std::placeholders::_1));
+            expectSubscribeNotifications();
+            subscriberConnectAck();
+            expectSubscribeReply();
+            expectDispatcherWaitConnectedAsync();
+            savedSubscriberCommandCb(std::error_code(), subscribeReplyMock);
+            expectMasterInquiry();
         }
 
         void setDefaultResponsesForMasterInquiryReplyParsing()
         {
-            ON_CALL(replyMock, getType())
+            ON_CALL(masterInquiryReplyMock, getType())
                 .WillByDefault(Return(Reply::Type::ARRAY));
-            ON_CALL(replyMock, getArray())
+            ON_CALL(masterInquiryReplyMock, getArray())
                 .WillByDefault(Return(&masterInquiryReply));
             ON_CALL(*masterInquiryReplyHost, getType())
                 .WillByDefault(Return(Reply::Type::STRING));
@@ -196,6 +290,68 @@ namespace
             ON_CALL(*masterInquiryReplyHost, getString())
                 .WillByDefault(Return(&portDataItem));
         }
+
+        void expectGetType(ReplyMock& mock, const Reply::Type& type)
+        {
+            EXPECT_CALL(mock, getType())
+                .Times(1)
+                .WillOnce(Return(type));
+        }
+
+        void expectGetString(ReplyMock& mock, const Reply::DataItem& item)
+        {
+            EXPECT_CALL(mock, getString())
+                .Times(1)
+                .WillOnce(Return(&item));
+        }
+
+        void expectGetInteger(ReplyMock& mock, int value)
+        {
+            EXPECT_CALL(mock, getInteger())
+                .Times(1)
+                .WillOnce(Return(value));
+        }
+
+        void expectGetArray(ReplyMock& mock, Reply::ReplyVector& replyVector)
+        {
+            EXPECT_CALL(mock, getArray())
+                .Times(1)
+                .WillOnce(Return(&replyVector));
+        }
+
+        void expectSubscribeReply()
+        {
+            expectGetType(subscribeReplyMock, Reply::Type::ARRAY);
+            expectGetArray(subscribeReplyMock, subscribeReplyVector);
+            expectGetType(*subscribeReplyArrayElement0, Reply::Type::STRING);
+            expectGetString(*subscribeReplyArrayElement0, subscribeDataItem);
+        }
+
+        void expectNotificationReply()
+        {
+            expectGetType(notificationReplyMock, Reply::Type::ARRAY);
+            expectGetArray(notificationReplyMock, notificationReplyVector);
+            expectGetType(*notificationReplyArrayElement0, Reply::Type::STRING);
+            expectGetString(*notificationReplyArrayElement0, notificationDataItem);
+            expectGetType(*notificationReplyArrayElement2, Reply::Type::STRING);
+            expectGetString(*notificationReplyArrayElement2, notificationMessageDataItem);
+        }
+
+        void setDefaultResponsesForNotificationReplyParsing()
+        {
+            ON_CALL(notificationReplyMock, getType())
+                .WillByDefault(Return(Reply::Type::ARRAY));
+            ON_CALL(notificationReplyMock, getArray())
+                .WillByDefault(Return(&notificationReplyVector));
+            ON_CALL(*notificationReplyArrayElement0, getType())
+                .WillByDefault(Return(Reply::Type::STRING));
+            ON_CALL(*notificationReplyArrayElement0, getString())
+                .WillByDefault(Return(&notificationDataItem));
+            ON_CALL(*notificationReplyArrayElement2, getType())
+                .WillByDefault(Return(Reply::Type::STRING));
+            ON_CALL(*notificationReplyArrayElement2, getString())
+                .WillByDefault(Return(&notificationMessageDataItem));
+        }
     };
 
     class AsyncSentinelDatabaseDiscoveryTest: public AsyncSentinelDatabaseDiscoveryBaseTest
@@ -203,21 +359,42 @@ namespace
     public:
         AsyncSentinelDatabaseDiscoveryTest()
         {
-            expectNewDispatcherCreated();
+            expectDispatchersCreated();
             asyncSentinelDatabaseDiscovery.reset(
                     new AsyncSentinelDatabaseDiscovery(
                             engineMock,
                             logger,
                             std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::asyncCommandDispatcherCreator,
-                                      this,
-                                      std::placeholders::_1,
-                                      std::placeholders::_2,
-                                      std::placeholders::_3),
+                                      this),
                             contentsBuilderMock));
         }
+
+        ~AsyncSentinelDatabaseDiscoveryTest()
+        {
+            EXPECT_CALL(*subscriberMock, disableCommandCallbacks())
+                .Times(1);
+            EXPECT_CALL(*dispatcherMock, disableCommandCallbacks())
+                .Times(1);
+        }
+    };
+
+    class AsyncSentinelDatabaseDiscoveryInListeningModeTest: public AsyncSentinelDatabaseDiscoveryTest
+    {
+    public:
+        AsyncSentinelDatabaseDiscoveryInListeningModeTest()
+        {
+            InSequence dummy;
+            setStateChangedCbExpectsBeforeMasterInquiry();
+            dispatcherConnectAck();
+            expectMasterIquiryReply();
+            expectStateChangedCb(someHost, somePort);
+            savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+        }
     };
 
     using AsyncSentinelDatabaseDiscoveryDeathTest = AsyncSentinelDatabaseDiscoveryTest;
+
+    using AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest = AsyncSentinelDatabaseDiscoveryInListeningModeTest;
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, IsNotCopyable)
@@ -233,115 +410,185 @@ TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, ImplementsAsyncDatabaseDiscovery)
     EXPECT_TRUE((std::is_base_of<AsyncDatabaseDiscovery, AsyncSentinelDatabaseDiscovery>::value));
 }
 
-TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterIsInquiredFromSentinel)
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, SettingChangedCallbackTriggersSentinelNotificationsSubscriptionAndMasterInquiry)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     expectMasterIquiryReply();
-    expectStateChangedCb();
-    savedCommandCb(std::error_code(), replyMock);
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
 }
 
-TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterInquiryErrorTriggersRetry)
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, MasterInquiryErrorTriggersRetry)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     expectMasterInquiryRetryTimer();
-    savedCommandCb(getWellKnownErrorCode(), replyMock);
+    savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock);
     expectMasterInquiry();
-    savedConnectionRetryTimerCallback();
+    savedMasterInquiryRetryTimerCallback();
     expectMasterIquiryReply();
-    expectStateChangedCb();
-    savedCommandCb(std::error_code(), replyMock);
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidReplyType)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
-    ON_CALL(replyMock, getType())
+    ON_CALL(masterInquiryReplyMock, getType())
         .WillByDefault(Return(Reply::Type::NIL));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidHostElementType)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     setDefaultResponsesForMasterInquiryReplyParsing();
     ON_CALL(*masterInquiryReplyHost, getType())
         .WillByDefault(Return(Reply::Type::NIL));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidPortElementType)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     setDefaultResponsesForMasterInquiryReplyParsing();
     ON_CALL(*masterInquiryReplyPort, getType())
         .WillByDefault(Return(Reply::Type::NIL));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_PortCantBeCastedToInt)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     setDefaultResponsesForMasterInquiryReplyParsing();
     std::string invalidPort("invalidPort");
     Reply::DataItem invalidPortDataItem({invalidPort,ReplyStringLength(invalidPort.length())});
     ON_CALL(*masterInquiryReplyPort, getString())
         .WillByDefault(Return(&invalidPortDataItem));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryTest, CallbackIsNotCalledAfterCleared)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     expectMasterInquiryRetryTimer();
-    savedCommandCb(getWellKnownErrorCode(), replyMock);
+    savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock);
     expectMasterInquiry();
-    savedConnectionRetryTimerCallback();
+    savedMasterInquiryRetryTimerCallback();
     expectMasterIquiryReply();
     asyncSentinelDatabaseDiscovery->clearStateChangedCb();
     EXPECT_CALL(*this, stateChangedCb(_))
         .Times(0);
-    savedCommandCb(std::error_code(), replyMock);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, ChangeNotificationFromSentinel)
+{
+    InSequence dummy;
+    setStateChangedCbExpectsBeforeMasterInquiry();
+    dispatcherConnectAck();
+    expectMasterIquiryReply();
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+    expectNotificationReply();
+    expectStateChangedCb(someOtherHost, someOtherPort);
+    savedSubscriberCommandCb(std::error_code(), notificationReplyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscribeCommandErrorTriggersRetry)
+{
+    InSequence dummy;
+    expectSubscribeRetryTimer();
+    savedSubscriberCommandCb(getWellKnownErrorCode(), subscribeReplyMock);
+    expectSubscribeNotifications();
+    savedSubscribeRetryTimerCallback();
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidReplyType)
+{
+    InSequence dummy;
+    ON_CALL(notificationReplyMock, getType())
+        .WillByDefault(Return(Reply::Type::NIL));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKindElementType)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    ON_CALL(*notificationReplyArrayElement0, getType())
+        .WillByDefault(Return(Reply::Type::NIL));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKind)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    std::string invalidKind("invalidKind");
+    Reply::DataItem invalidKindDataItem({invalidKind,ReplyStringLength(invalidKind.length())});
+    ON_CALL(*notificationReplyArrayElement0, getString())
+        .WillByDefault(Return(&invalidKindDataItem));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageElementType)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    ON_CALL(*notificationReplyArrayElement2, getType())
+        .WillByDefault(Return(Reply::Type::NIL));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageStructure)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    std::string invalidMessage("mymaster oldHost 1234 5678");
+    auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())}));
+    ON_CALL(*notificationReplyArrayElement2, getString())
+        .WillByDefault(Return(&invalidMessageDataItem));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidPort)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    std::string invalidMessage("mymaster oldHost 1234 newHost invalidPort");
+    auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())}));
+    ON_CALL(*notificationReplyArrayElement2, getString())
+        .WillByDefault(Return(&invalidMessageDataItem));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscriberDisconnectCallbackTriggersSubscriptionRenewal)
+{
+    InSequence dummy;
+    expectSubscriberWaitConnectedAsync();
+    subscriberDisconnectCb();
+    expectSubscribeNotifications();
+    subscriberConnectAck();
+    expectSubscribeReply();
+    expectDispatcherWaitConnectedAsync();
+    savedSubscriberCommandCb(std::error_code(), subscribeReplyMock);
+    expectMasterInquiry();
+    dispatcherConnectAck();
+    expectMasterIquiryReply();
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
 }