Add first version
[ric-plt/sdl.git] / tst / asynchiredisclustercommanddispatcher_test.cpp
diff --git a/tst/asynchiredisclustercommanddispatcher_test.cpp b/tst/asynchiredisclustercommanddispatcher_test.cpp
new file mode 100644 (file)
index 0000000..14421f8
--- /dev/null
@@ -0,0 +1,1026 @@
+/*
+   Copyright (c) 2018-2019 Nokia.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+#include <type_traits>
+#include <memory>
+#include <cstring>
+#include <sys/epoll.h>
+#include <sys/timerfd.h>
+#include <sys/eventfd.h>
+#include <arpa/inet.h>
+#include <gtest/gtest.h>
+#include <async.h>
+#include "private/createlogger.hpp"
+#include "private/error.hpp"
+#include "private/logger.hpp"
+#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
+#include "private/redis/redisgeneral.hpp"
+#include "private/redis/reply.hpp"
+#include "private/redis/contents.hpp"
+#include "private/tst/hiredisclustersystemmock.hpp"
+#include "private/timer.hpp"
+#include "private/tst/contentsbuildermock.hpp"
+#include "private/tst/enginemock.hpp"
+#include "private/tst/hiredisclusterepolladaptermock.hpp"
+#include "private/tst/redisreplybuilder.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+using namespace shareddatalayer::tst;
+using namespace testing;
+
+namespace
+{
+    class AsyncHiredisClusterCommandDispatcherBaseTest: public testing::Test
+    {
+    public:
+        std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
+        StrictMock<EngineMock> engineMock;
+        HiredisClusterSystemMock hiredisClusterSystemMock;
+        std::shared_ptr<HiredisClusterEpollAdapterMock> adapterMock;
+        redisClusterAsyncContext acc;
+        redisAsyncContext ac;
+        int hiredisFd;
+        std::unique_ptr<AsyncHiredisClusterCommandDispatcher> dispatcher;
+        void (*connected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
+        void (*disconnected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
+        Timer::Callback savedConnectionRetryTimerCallback;
+        Timer::Duration expectedRetryTimerDuration;
+        Contents contents;
+        Contents clusterConnectionSetupContents;
+        RedisReplyBuilder redisReplyBuilder;
+        const AsyncConnection::Namespace defaultNamespace;
+        std::shared_ptr<Logger> logger;
+
+        AsyncHiredisClusterCommandDispatcherBaseTest():
+            contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
+            adapterMock(std::make_shared<HiredisClusterEpollAdapterMock>(engineMock, hiredisClusterSystemMock)),
+            acc { },
+            ac { },
+            hiredisFd(3),
+            connected(nullptr),
+            disconnected(nullptr),
+            expectedRetryTimerDuration(std::chrono::seconds(1)),
+            contents { { "CMD", "key1", "value1", "key2", "value2" },
+                       { 3, 4, 6, 4, 6 } },
+            redisReplyBuilder { },
+            defaultNamespace("namespace"),
+            logger(createLogger(SDL_LOG_PREFIX))
+        {
+        }
+
+        virtual ~AsyncHiredisClusterCommandDispatcherBaseTest()
+        {
+        }
+
+        MOCK_METHOD0(connectAck, void());
+
+        MOCK_METHOD0(disconnectCallback, void());
+
+        MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
+
+        void expectationsUntilConnect()
+        {
+            expectationsUntilConnect(acc);
+        }
+
+        void expectationsUntilConnect(redisClusterAsyncContext& acc)
+        {
+            expectRedisClusterAsyncConnect(acc);
+        }
+
+        void expectRedisClusterAsyncConnect()
+        {
+            expectRedisClusterAsyncConnect(acc);
+        }
+
+        void expectRedisClusterAsyncConnect(redisClusterAsyncContext& acc)
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
+                                                                           HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
+                .Times(1)
+                .WillOnce(InvokeWithoutArgs([this, &acc]()
+                                            {
+                                                return &acc;
+                                            }));
+        }
+
+        void expectRedisClusterAsyncConnectReturnNullptr()
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
+                                                                           HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
+                .Times(1)
+                .WillOnce(InvokeWithoutArgs([this]()
+                                            {
+                                                return nullptr;
+                                            }));
+        }
+
+        void expectRedisClusterAsyncSetConnectCallback()
+        {
+            expectRedisClusterAsyncSetConnectCallback(acc);
+        }
+
+        void expectRedisClusterAsyncSetConnectCallback(redisClusterAsyncContext& acc)
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetConnectCallback(&acc, _))
+                .Times(1)
+                .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceConnectCallback* cb)
+                                 {
+                                     connected = cb;
+                                     return REDIS_OK;
+                                 }));
+        }
+
+        void expectRedisClusterAsyncSetDisconnectCallback()
+        {
+            expectRedisClusterAsyncSetDisconnectCallback(acc);
+        }
+
+        void expectRedisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext& acc)
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetDisconnectCallback(&acc, _))
+                .Times(1)
+                .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceDisconnectCallback* cb)
+                                 {
+                                     disconnected = cb;
+                                     return REDIS_OK;
+                                 }));
+        }
+
+        void expectCommandListQuery()
+        {
+            expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
+        }
+
+        void expectCommandListQueryReturnError()
+        {
+            expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
+        }
+
+        void expectAdapterSetup()
+        {
+            expectAdapterSetup(acc);
+        }
+
+        void expectAdapterSetup(redisClusterAsyncContext& acc)
+        {
+            EXPECT_CALL(*adapterMock, setup(&acc))
+                .Times(1);
+        }
+
+        void expectAdapterDetach()
+        {
+            EXPECT_CALL(*adapterMock, detach(&ac))
+                .Times(1);
+        }
+
+        void expectConnectAck()
+        {
+            EXPECT_CALL(*this, connectAck())
+                .Times(1);
+        }
+
+        void expectDisconnectCallback()
+        {
+            EXPECT_CALL(*this, disconnectCallback())
+                .Times(1);
+        }
+
+        void expectRedisClusterAsyncFree()
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
+                .Times(1);
+        }
+
+        void expectRedisClusterAsyncDisconnect()
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncDisconnect(&acc))
+                .Times(1);
+        }
+
+        void verifyAckErrorReply(const Reply& reply)
+        {
+            EXPECT_EQ(Reply::Type::NIL, reply.getType());
+            EXPECT_EQ(0, reply.getInteger());
+            EXPECT_TRUE(reply.getString()->str.empty());
+            EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
+            EXPECT_TRUE(reply.getArray()->empty());
+        }
+
+        void expectAckError()
+        {
+            EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
+                .Times(1)
+                .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+                                 {
+                                     verifyAckErrorReply(reply);
+                                 }));
+        }
+
+        void expectAckError(const std::error_code& ec)
+        {
+            EXPECT_CALL(*this, ack(ec, _))
+                .Times(1)
+                .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+                                 {
+                                     verifyAckErrorReply(reply);
+                                 }));
+        }
+
+        void expectArmConnectionRetryTimer()
+        {
+            EXPECT_CALL(engineMock, armTimer(_, expectedRetryTimerDuration, _))
+                .Times(1)
+                .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
+        }
+
+        void expectDisarmConnectionRetryTimer()
+        {
+            EXPECT_CALL(engineMock, disarmTimer(_))
+                .Times(1);
+        }
+
+        void expectRedisClusterAsyncCommandArgv(redisReply& rr)
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+                .Times(1)
+                .WillOnce(Invoke([&rr](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
+                                       int, const char**, const size_t*)
+                                 {
+                                     cb(acc, &rr, pd);
+                                     return REDIS_OK;
+                                 }));
+        }
+
+        void expectAck()
+        {
+            EXPECT_CALL(*this, ack(std::error_code(), _))
+                .Times(1);
+        }
+
+        void expectReplyError(const std::string& msg)
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+                .Times(1)
+                .WillOnce(Invoke([this, msg](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*,
+                                    int, int, const char**, const size_t*)
+                                 {
+                                     cb(acc, &redisReplyBuilder.buildErrorReply(msg), pd);
+                                     return REDIS_OK;
+                                 }));
+        }
+
+        void expectContextError(int code)
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+                .Times(1)
+                .WillOnce(Invoke([code](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
+                                        int, const char**, const size_t*)
+                                 {
+                                     acc->err = code;
+                                     cb(acc, nullptr, pd);
+                                     return REDIS_OK;
+                                 }));
+        }
+
+        void expectRedisClusterAsyncFreeCallPendingCallback(redisClusterCallbackFn* cb, void* pd)
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
+                .Times(1)
+                .WillOnce(Invoke([this, cb, pd](redisClusterAsyncContext* acc)
+                                 {
+                                     cb(acc, &redisReplyBuilder.buildNilReply(), pd);
+                                 }));
+        }
+
+        void expectAckNotCalled()
+        {
+            EXPECT_CALL(*this, ack(_,_))
+                .Times(0);
+        }
+
+        void expectionsForSuccessfullConnectionSetup()
+        {
+            expectationsUntilConnect();
+            expectAdapterSetup();
+            expectRedisClusterAsyncSetConnectCallback();
+            expectRedisClusterAsyncSetDisconnectCallback();
+            expectCommandListQuery();
+        }
+
+        void callConnectionRetryTimerCallback()
+        {
+            ASSERT_NE(savedConnectionRetryTimerCallback, nullptr);
+            savedConnectionRetryTimerCallback();
+        }
+    };
+
+    class AsyncHiredisClusterCommandDispatcherDisconnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
+    {
+    public:
+        AsyncHiredisClusterCommandDispatcherDisconnectedTest()
+        {
+            InSequence dummy;
+            expectationsUntilConnect();
+            expectAdapterSetup();
+            expectRedisClusterAsyncSetConnectCallback();
+            expectRedisClusterAsyncSetDisconnectCallback();
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+                .Times(1);
+            dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+                                                                      defaultNamespace,
+                                                                      { { "addr1", 111 }, { "addr2", 222 } },
+                                                                      contentsBuilderMock,
+                                                                      false,
+                                                                      hiredisClusterSystemMock,
+                                                                      adapterMock,
+                                                                      logger));
+        }
+
+        ~AsyncHiredisClusterCommandDispatcherDisconnectedTest()
+        {
+        }
+    };
+
+    class AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisClusterCommandDispatcherBaseTest
+    {
+    public:
+        AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
+        {
+            InSequence dummy;
+            expectionsForSuccessfullConnectionSetup();
+            dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+                                                                      defaultNamespace,
+                                                                      { { "addr1", 111 }, { "addr2", 222 } },
+                                                                      contentsBuilderMock,
+                                                                      true,
+                                                                      hiredisClusterSystemMock,
+                                                                      adapterMock,
+                                                                      logger));
+        }
+
+        ~AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
+        {
+            expectRedisClusterAsyncFree();
+        }
+    };
+
+    class AsyncHiredisClusterCommandDispatcherConnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
+    {
+    public:
+        redisClusterCallbackFn* savedCb;
+        void* savedPd;
+
+        AsyncHiredisClusterCommandDispatcherConnectedTest():
+            savedCb(nullptr),
+            savedPd(nullptr)
+        {
+            InSequence dummy;
+            expectionsForSuccessfullConnectionSetup();
+            dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+                                                                      defaultNamespace,
+                                                                      { { "addr1", 111 }, { "addr2", 222 } },
+                                                                      contentsBuilderMock,
+                                                                      false,
+                                                                      hiredisClusterSystemMock,
+                                                                      adapterMock,
+                                                                      logger));
+            connected(&acc, &ac, 0);
+        }
+
+        ~AsyncHiredisClusterCommandDispatcherConnectedTest()
+        {
+            expectRedisClusterAsyncFree();
+        }
+
+        void expectRedisClusterAsyncCommandArgvWithKey_SaveCb()
+        {
+            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+                .Times(1)
+                .WillOnce(Invoke([this](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
+                                        const char*, int, int, const char**, const size_t*)
+                                 {
+                                     savedCb = cb;
+                                     savedPd = pd;
+                                     return REDIS_OK;
+                                 }));
+        }
+    };
+
+    using AsyncHiredisClusterCommandDispatcherDeathTest = AsyncHiredisClusterCommandDispatcherConnectedTest;
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, IsNotCopyable)
+{
+    EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisClusterCommandDispatcher>::value);
+    EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisClusterCommandDispatcher>::value);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
+{
+    EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisClusterCommandDispatcher>::value));
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
+{
+    Engine::Callback storedCallback;
+    EXPECT_CALL(engineMock, postCallback(_))
+        .Times(1)
+        .WillOnce(SaveArg<0>(&storedCallback));
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              { });
+    expectAckError(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED);
+    storedCallback();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ContextErrorInConnectArmsRetryTimer)
+{
+    InSequence dummy;
+    acc.err = 123;
+    expectationsUntilConnect();
+    expectArmConnectionRetryTimer();
+    expectDisarmConnectionRetryTimer();
+
+    dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+                                                              defaultNamespace,
+                                                              { { "addr1", 111 }, { "addr2", 222 } },
+                                                              contentsBuilderMock,
+                                                              false,
+                                                              hiredisClusterSystemMock,
+                                                              adapterMock,
+                                                              logger));
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
+{
+    InSequence dummy;
+    expectRedisClusterAsyncConnectReturnNullptr();
+    expectArmConnectionRetryTimer();
+    expectDisarmConnectionRetryTimer();
+
+    dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+                                                              defaultNamespace,
+                                                              { { "addr1", 111 }, { "addr2", 222 } },
+                                                              contentsBuilderMock,
+                                                              false,
+                                                              hiredisClusterSystemMock,
+                                                              adapterMock,
+                                                              logger));
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, FailedCommandListQueryArmsRetryTimer)
+{
+    InSequence dummy;
+    Engine::Callback storedCallback;
+    expectationsUntilConnect();
+    expectAdapterSetup();
+    expectRedisClusterAsyncSetConnectCallback();
+    expectRedisClusterAsyncSetDisconnectCallback();
+    expectCommandListQueryReturnError();
+    expectArmConnectionRetryTimer();
+
+    dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+                                                              defaultNamespace,
+                                                              { { "addr1", 111 }, { "addr2", 222 } },
+                                                              contentsBuilderMock,
+                                                              false,
+                                                              hiredisClusterSystemMock,
+                                                              adapterMock,
+                                                              logger));
+
+    expectDisarmConnectionRetryTimer();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ConnectionSucceedsWithRetryTimer)
+{
+    InSequence dummy;
+    expectRedisClusterAsyncConnectReturnNullptr();
+    expectArmConnectionRetryTimer();
+
+    dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+                                                              defaultNamespace,
+                                                              { { "addr1", 111 }, { "addr2", 222 } },
+                                                              contentsBuilderMock,
+                                                              false,
+                                                              hiredisClusterSystemMock,
+                                                              adapterMock,
+                                                              logger));
+
+    expectionsForSuccessfullConnectionSetup();
+    expectRedisClusterAsyncFree();
+
+    callConnectionRetryTimerCallback();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ConnectAckCalledIfConnected)
+{
+    Engine::Callback storedCallback;
+    EXPECT_CALL(engineMock, postCallback(_))
+        .Times(1)
+        .WillOnce(SaveArg<0>(&storedCallback));
+    dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::connectAck,
+                                             this));
+    expectConnectAck();
+    storedCallback();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanDispatchCommands)
+{
+    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+        .Times(1)
+        .WillOnce(Invoke([this](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char *key,
+                                int keylen, int argc, const char** argv, const size_t* argvlen)
+                         {
+                             EXPECT_STREQ(defaultNamespace.c_str(), key);
+                             EXPECT_EQ(9, keylen);
+                             EXPECT_EQ((int)contents.stack.size(), argc);
+                             EXPECT_EQ(contents.sizes[0], argvlen[0]);
+                             EXPECT_EQ(contents.sizes[1], argvlen[1]);
+                             EXPECT_EQ(contents.sizes[2], argvlen[2]);
+                             EXPECT_EQ(contents.sizes[3], argvlen[3]);
+                             EXPECT_EQ(contents.sizes[4], argvlen[4]);
+                             EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
+                             EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
+                             EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
+                             EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
+                             EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
+                             cb(acc, &redisReplyBuilder.buildNilReply(), pd);
+                             return REDIS_OK;
+                         }));
+    expectAck();
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseNilReply)
+{
+    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildNilReply());
+    EXPECT_CALL(*this, ack(std::error_code(), _))
+        .Times(1)
+        .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
+                         {
+                             EXPECT_EQ(Reply::Type::NIL, reply.getType());
+                             EXPECT_EQ(0, reply.getInteger());
+                             EXPECT_TRUE(reply.getString()->str.empty());
+                             EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
+                             EXPECT_TRUE(reply.getArray()->empty());
+                         }));
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseIntegerReply)
+{
+    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
+    EXPECT_CALL(*this, ack(std::error_code(), _))
+        .Times(1)
+        .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+                         {
+                             auto expected(redisReplyBuilder.buildIntegerReply());
+                             EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
+                             EXPECT_EQ(expected.integer, reply.getInteger());
+                         }));
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStatusReply)
+{
+    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
+    EXPECT_CALL(*this, ack(std::error_code(), _))
+        .Times(1)
+        .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+                         {
+                             auto expected(redisReplyBuilder.buildStatusReply());
+                             EXPECT_EQ(Reply::Type::STATUS, reply.getType());
+                             EXPECT_EQ(expected.len, reply.getString()->len);
+                             EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
+                         }));
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStringReply)
+{
+    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStringReply());
+    EXPECT_CALL(*this, ack(std::error_code(), _))
+        .Times(1)
+        .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+                         {
+                             auto expected(redisReplyBuilder.buildStringReply());
+                             EXPECT_EQ(Reply::Type::STRING, reply.getType());
+                             EXPECT_EQ(expected.len, reply.getString()->len);
+                             EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
+                         }));
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseArrayReply)
+{
+    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
+    EXPECT_CALL(*this, ack(std::error_code(), _))
+        .Times(1)
+        .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+                         {
+                             auto array(reply.getArray());
+                             EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
+                             EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
+                             EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
+                         }));
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
+{
+    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+        .Times(1)
+        .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn*, void*, const char*, int, int,
+                            const char**, const size_t*)
+                         {
+                             acc->err = REDIS_ERR;
+                             return REDIS_ERR;
+                         }));
+    Engine::Callback storedCallback;
+    EXPECT_CALL(engineMock, postCallback(_))
+        .Times(1)
+        .WillOnce(SaveArg<0>(&storedCallback));
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+    expectAckError();
+    storedCallback();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
+{
+    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+        .Times(1)
+        .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
+                            const char**, const size_t*)
+                         {
+                             cb(acc, nullptr, pd);
+                             return REDIS_OK;
+                         }));
+    expectAckError();
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
+{
+    expectReplyError("LOADING Redis is loading the dataset in memory");
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterDownIsRecognizedFromReply)
+{
+    //SDL checks only that reply starts with CLUSTERDOWN string
+    expectReplyError("CLUSTERDOWN");
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+
+    expectReplyError("CLUSTERDOWN The cluster is down");
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+
+    expectReplyError("CLUSTERDOW");
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
+{
+    expectReplyError("ERR Protocol error: invalid bulk length");
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
+{
+    expectReplyError("something sinister");
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EmptyReplyErrorIsConvertedToUnknownError)
+{
+    expectReplyError("");
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, IOErrorInContext)
+{
+    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+        .Times(1)
+        .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
+                            const char**, const size_t*)
+                         {
+                             acc->err = REDIS_ERR_IO;
+                             errno = EINVAL;
+                             cb(acc, nullptr, pd);
+                             return REDIS_OK;
+                         }));
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EofErrorInContext)
+{
+    expectContextError(REDIS_ERR_EOF);
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorInContext)
+{
+    expectContextError(REDIS_ERR_PROTOCOL);
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, OomErrorInContext)
+{
+    expectContextError(REDIS_ERR_OOM);
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorNotConnectedInContext)
+{
+    expectContextError(CLUSTER_ERROR_NOT_CONNECTED);
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorConnectionLostInContext)
+{
+    expectContextError(CLUSTER_ERROR_CONNECTION_LOST);
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
+{
+    expectContextError(REDIS_ERR_OTHER);
+    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+        .Times(1);
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
+{
+    InSequence dummy;
+    expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+    expectAck();
+    savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
+    dispatcher->disableCommandCallbacks();
+    expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+    expectAckNotCalled();
+    savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DisconnectCallbackDetachesContextFromAdapter)
+{
+    InSequence dummy;
+    expectAdapterDetach();
+    disconnected(&acc, &ac, 0);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
+{
+    InSequence dummy;
+    dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::disconnectCallback,
+                                             this));
+    expectAdapterDetach();
+    expectDisconnectCallback();
+    disconnected(&acc, &ac, 0);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
+{
+    InSequence dummy;
+    redisClusterCallbackFn* savedCb;
+    void* savedPd;
+    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+        .Times(1)
+        .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
+                                              const char*, int, int, const char**, const size_t*)
+                         {
+                             savedCb = cb;
+                             savedPd = pd;
+                             return REDIS_OK;
+                         }));
+    Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              defaultNamespace,
+                              contents);
+    EXPECT_CALL(*this, ack(std::error_code(), _))
+            .Times(3);
+    redisReply rr;
+    rr.type = REDIS_REPLY_NIL;
+    savedCb(&acc, &rr, savedPd);
+    savedCb(&acc, &rr, savedPd);
+    savedCb(&acc, &rr, savedPd);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
+{
+    InSequence dummy;
+    redisClusterCallbackFn* savedCb;
+    void* savedPd;
+    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+        .Times(1)
+        .WillOnce(Invoke([this, &savedCb, &savedPd](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd,
+                                                    const char*, int, int, const char**, const size_t*)
+                         {
+                             savedCb = cb;
+                             savedPd = pd;
+                             cb(acc, &redisReplyBuilder.buildNilReply(), pd);
+                             return REDIS_OK;
+                         }));
+    expectAck();
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+    EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, TooManyRepliesAborts)
+{
+    InSequence dummy;
+    redisClusterCallbackFn* savedCb;
+    void* savedPd;
+    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+        .Times(1)
+        .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
+                                              const char*, int, int, const char**, const size_t*)
+                         {
+                             savedCb = cb;
+                             savedPd = pd;
+                             return REDIS_OK;
+                         }));
+    Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
+    expectAck();
+    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                                        defaultNamespace,
+                                        contents);
+    savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd);
+    EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
+}