--- /dev/null
+/*
+ Copyright (c) 2018-2019 Nokia.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#include <type_traits>
+#include <memory>
+#include <cstring>
+#include <sys/epoll.h>
+#include <sys/timerfd.h>
+#include <sys/eventfd.h>
+#include <arpa/inet.h>
+#include <gtest/gtest.h>
+#include <async.h>
+#include "private/createlogger.hpp"
+#include "private/error.hpp"
+#include "private/logger.hpp"
+#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
+#include "private/redis/redisgeneral.hpp"
+#include "private/redis/reply.hpp"
+#include "private/redis/contents.hpp"
+#include "private/tst/hiredisclustersystemmock.hpp"
+#include "private/timer.hpp"
+#include "private/tst/contentsbuildermock.hpp"
+#include "private/tst/enginemock.hpp"
+#include "private/tst/hiredisclusterepolladaptermock.hpp"
+#include "private/tst/redisreplybuilder.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+using namespace shareddatalayer::tst;
+using namespace testing;
+
+namespace
+{
+ class AsyncHiredisClusterCommandDispatcherBaseTest: public testing::Test
+ {
+ public:
+ std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
+ StrictMock<EngineMock> engineMock;
+ HiredisClusterSystemMock hiredisClusterSystemMock;
+ std::shared_ptr<HiredisClusterEpollAdapterMock> adapterMock;
+ redisClusterAsyncContext acc;
+ redisAsyncContext ac;
+ int hiredisFd;
+ std::unique_ptr<AsyncHiredisClusterCommandDispatcher> dispatcher;
+ void (*connected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
+ void (*disconnected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
+ Timer::Callback savedConnectionRetryTimerCallback;
+ Timer::Duration expectedRetryTimerDuration;
+ Contents contents;
+ Contents clusterConnectionSetupContents;
+ RedisReplyBuilder redisReplyBuilder;
+ const AsyncConnection::Namespace defaultNamespace;
+ std::shared_ptr<Logger> logger;
+
+ AsyncHiredisClusterCommandDispatcherBaseTest():
+ contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
+ adapterMock(std::make_shared<HiredisClusterEpollAdapterMock>(engineMock, hiredisClusterSystemMock)),
+ acc { },
+ ac { },
+ hiredisFd(3),
+ connected(nullptr),
+ disconnected(nullptr),
+ expectedRetryTimerDuration(std::chrono::seconds(1)),
+ contents { { "CMD", "key1", "value1", "key2", "value2" },
+ { 3, 4, 6, 4, 6 } },
+ redisReplyBuilder { },
+ defaultNamespace("namespace"),
+ logger(createLogger(SDL_LOG_PREFIX))
+ {
+ }
+
+ virtual ~AsyncHiredisClusterCommandDispatcherBaseTest()
+ {
+ }
+
+ MOCK_METHOD0(connectAck, void());
+
+ MOCK_METHOD0(disconnectCallback, void());
+
+ MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
+
+ void expectationsUntilConnect()
+ {
+ expectationsUntilConnect(acc);
+ }
+
+ void expectationsUntilConnect(redisClusterAsyncContext& acc)
+ {
+ expectRedisClusterAsyncConnect(acc);
+ }
+
+ void expectRedisClusterAsyncConnect()
+ {
+ expectRedisClusterAsyncConnect(acc);
+ }
+
+ void expectRedisClusterAsyncConnect(redisClusterAsyncContext& acc)
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
+ HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([this, &acc]()
+ {
+ return &acc;
+ }));
+ }
+
+ void expectRedisClusterAsyncConnectReturnNullptr()
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
+ HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([this]()
+ {
+ return nullptr;
+ }));
+ }
+
+ void expectRedisClusterAsyncSetConnectCallback()
+ {
+ expectRedisClusterAsyncSetConnectCallback(acc);
+ }
+
+ void expectRedisClusterAsyncSetConnectCallback(redisClusterAsyncContext& acc)
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetConnectCallback(&acc, _))
+ .Times(1)
+ .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceConnectCallback* cb)
+ {
+ connected = cb;
+ return REDIS_OK;
+ }));
+ }
+
+ void expectRedisClusterAsyncSetDisconnectCallback()
+ {
+ expectRedisClusterAsyncSetDisconnectCallback(acc);
+ }
+
+ void expectRedisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext& acc)
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetDisconnectCallback(&acc, _))
+ .Times(1)
+ .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceDisconnectCallback* cb)
+ {
+ disconnected = cb;
+ return REDIS_OK;
+ }));
+ }
+
+ void expectCommandListQuery()
+ {
+ expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
+ }
+
+ void expectCommandListQueryReturnError()
+ {
+ expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
+ }
+
+ void expectAdapterSetup()
+ {
+ expectAdapterSetup(acc);
+ }
+
+ void expectAdapterSetup(redisClusterAsyncContext& acc)
+ {
+ EXPECT_CALL(*adapterMock, setup(&acc))
+ .Times(1);
+ }
+
+ void expectAdapterDetach()
+ {
+ EXPECT_CALL(*adapterMock, detach(&ac))
+ .Times(1);
+ }
+
+ void expectConnectAck()
+ {
+ EXPECT_CALL(*this, connectAck())
+ .Times(1);
+ }
+
+ void expectDisconnectCallback()
+ {
+ EXPECT_CALL(*this, disconnectCallback())
+ .Times(1);
+ }
+
+ void expectRedisClusterAsyncFree()
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
+ .Times(1);
+ }
+
+ void expectRedisClusterAsyncDisconnect()
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncDisconnect(&acc))
+ .Times(1);
+ }
+
+ void verifyAckErrorReply(const Reply& reply)
+ {
+ EXPECT_EQ(Reply::Type::NIL, reply.getType());
+ EXPECT_EQ(0, reply.getInteger());
+ EXPECT_TRUE(reply.getString()->str.empty());
+ EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
+ EXPECT_TRUE(reply.getArray()->empty());
+ }
+
+ void expectAckError()
+ {
+ EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ verifyAckErrorReply(reply);
+ }));
+ }
+
+ void expectAckError(const std::error_code& ec)
+ {
+ EXPECT_CALL(*this, ack(ec, _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ verifyAckErrorReply(reply);
+ }));
+ }
+
+ void expectArmConnectionRetryTimer()
+ {
+ EXPECT_CALL(engineMock, armTimer(_, expectedRetryTimerDuration, _))
+ .Times(1)
+ .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
+ }
+
+ void expectDisarmConnectionRetryTimer()
+ {
+ EXPECT_CALL(engineMock, disarmTimer(_))
+ .Times(1);
+ }
+
+ void expectRedisClusterAsyncCommandArgv(redisReply& rr)
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([&rr](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
+ int, const char**, const size_t*)
+ {
+ cb(acc, &rr, pd);
+ return REDIS_OK;
+ }));
+ }
+
+ void expectAck()
+ {
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1);
+ }
+
+ void expectReplyError(const std::string& msg)
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([this, msg](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*,
+ int, int, const char**, const size_t*)
+ {
+ cb(acc, &redisReplyBuilder.buildErrorReply(msg), pd);
+ return REDIS_OK;
+ }));
+ }
+
+ void expectContextError(int code)
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([code](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
+ int, const char**, const size_t*)
+ {
+ acc->err = code;
+ cb(acc, nullptr, pd);
+ return REDIS_OK;
+ }));
+ }
+
+ void expectRedisClusterAsyncFreeCallPendingCallback(redisClusterCallbackFn* cb, void* pd)
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
+ .Times(1)
+ .WillOnce(Invoke([this, cb, pd](redisClusterAsyncContext* acc)
+ {
+ cb(acc, &redisReplyBuilder.buildNilReply(), pd);
+ }));
+ }
+
+ void expectAckNotCalled()
+ {
+ EXPECT_CALL(*this, ack(_,_))
+ .Times(0);
+ }
+
+ void expectionsForSuccessfullConnectionSetup()
+ {
+ expectationsUntilConnect();
+ expectAdapterSetup();
+ expectRedisClusterAsyncSetConnectCallback();
+ expectRedisClusterAsyncSetDisconnectCallback();
+ expectCommandListQuery();
+ }
+
+ void callConnectionRetryTimerCallback()
+ {
+ ASSERT_NE(savedConnectionRetryTimerCallback, nullptr);
+ savedConnectionRetryTimerCallback();
+ }
+ };
+
+ class AsyncHiredisClusterCommandDispatcherDisconnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
+ {
+ public:
+ AsyncHiredisClusterCommandDispatcherDisconnectedTest()
+ {
+ InSequence dummy;
+ expectationsUntilConnect();
+ expectAdapterSetup();
+ expectRedisClusterAsyncSetConnectCallback();
+ expectRedisClusterAsyncSetDisconnectCallback();
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1);
+ dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+ defaultNamespace,
+ { { "addr1", 111 }, { "addr2", 222 } },
+ contentsBuilderMock,
+ false,
+ hiredisClusterSystemMock,
+ adapterMock,
+ logger));
+ }
+
+ ~AsyncHiredisClusterCommandDispatcherDisconnectedTest()
+ {
+ }
+ };
+
+ class AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisClusterCommandDispatcherBaseTest
+ {
+ public:
+ AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
+ {
+ InSequence dummy;
+ expectionsForSuccessfullConnectionSetup();
+ dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+ defaultNamespace,
+ { { "addr1", 111 }, { "addr2", 222 } },
+ contentsBuilderMock,
+ true,
+ hiredisClusterSystemMock,
+ adapterMock,
+ logger));
+ }
+
+ ~AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
+ {
+ expectRedisClusterAsyncFree();
+ }
+ };
+
+ class AsyncHiredisClusterCommandDispatcherConnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
+ {
+ public:
+ redisClusterCallbackFn* savedCb;
+ void* savedPd;
+
+ AsyncHiredisClusterCommandDispatcherConnectedTest():
+ savedCb(nullptr),
+ savedPd(nullptr)
+ {
+ InSequence dummy;
+ expectionsForSuccessfullConnectionSetup();
+ dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+ defaultNamespace,
+ { { "addr1", 111 }, { "addr2", 222 } },
+ contentsBuilderMock,
+ false,
+ hiredisClusterSystemMock,
+ adapterMock,
+ logger));
+ connected(&acc, &ac, 0);
+ }
+
+ ~AsyncHiredisClusterCommandDispatcherConnectedTest()
+ {
+ expectRedisClusterAsyncFree();
+ }
+
+ void expectRedisClusterAsyncCommandArgvWithKey_SaveCb()
+ {
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([this](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
+ const char*, int, int, const char**, const size_t*)
+ {
+ savedCb = cb;
+ savedPd = pd;
+ return REDIS_OK;
+ }));
+ }
+ };
+
+ using AsyncHiredisClusterCommandDispatcherDeathTest = AsyncHiredisClusterCommandDispatcherConnectedTest;
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, IsNotCopyable)
+{
+ EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisClusterCommandDispatcher>::value);
+ EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisClusterCommandDispatcher>::value);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
+{
+ EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisClusterCommandDispatcher>::value));
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
+{
+ Engine::Callback storedCallback;
+ EXPECT_CALL(engineMock, postCallback(_))
+ .Times(1)
+ .WillOnce(SaveArg<0>(&storedCallback));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ { });
+ expectAckError(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED);
+ storedCallback();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ContextErrorInConnectArmsRetryTimer)
+{
+ InSequence dummy;
+ acc.err = 123;
+ expectationsUntilConnect();
+ expectArmConnectionRetryTimer();
+ expectDisarmConnectionRetryTimer();
+
+ dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+ defaultNamespace,
+ { { "addr1", 111 }, { "addr2", 222 } },
+ contentsBuilderMock,
+ false,
+ hiredisClusterSystemMock,
+ adapterMock,
+ logger));
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
+{
+ InSequence dummy;
+ expectRedisClusterAsyncConnectReturnNullptr();
+ expectArmConnectionRetryTimer();
+ expectDisarmConnectionRetryTimer();
+
+ dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+ defaultNamespace,
+ { { "addr1", 111 }, { "addr2", 222 } },
+ contentsBuilderMock,
+ false,
+ hiredisClusterSystemMock,
+ adapterMock,
+ logger));
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, FailedCommandListQueryArmsRetryTimer)
+{
+ InSequence dummy;
+ Engine::Callback storedCallback;
+ expectationsUntilConnect();
+ expectAdapterSetup();
+ expectRedisClusterAsyncSetConnectCallback();
+ expectRedisClusterAsyncSetDisconnectCallback();
+ expectCommandListQueryReturnError();
+ expectArmConnectionRetryTimer();
+
+ dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+ defaultNamespace,
+ { { "addr1", 111 }, { "addr2", 222 } },
+ contentsBuilderMock,
+ false,
+ hiredisClusterSystemMock,
+ adapterMock,
+ logger));
+
+ expectDisarmConnectionRetryTimer();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ConnectionSucceedsWithRetryTimer)
+{
+ InSequence dummy;
+ expectRedisClusterAsyncConnectReturnNullptr();
+ expectArmConnectionRetryTimer();
+
+ dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
+ defaultNamespace,
+ { { "addr1", 111 }, { "addr2", 222 } },
+ contentsBuilderMock,
+ false,
+ hiredisClusterSystemMock,
+ adapterMock,
+ logger));
+
+ expectionsForSuccessfullConnectionSetup();
+ expectRedisClusterAsyncFree();
+
+ callConnectionRetryTimerCallback();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ConnectAckCalledIfConnected)
+{
+ Engine::Callback storedCallback;
+ EXPECT_CALL(engineMock, postCallback(_))
+ .Times(1)
+ .WillOnce(SaveArg<0>(&storedCallback));
+ dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::connectAck,
+ this));
+ expectConnectAck();
+ storedCallback();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanDispatchCommands)
+{
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([this](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char *key,
+ int keylen, int argc, const char** argv, const size_t* argvlen)
+ {
+ EXPECT_STREQ(defaultNamespace.c_str(), key);
+ EXPECT_EQ(9, keylen);
+ EXPECT_EQ((int)contents.stack.size(), argc);
+ EXPECT_EQ(contents.sizes[0], argvlen[0]);
+ EXPECT_EQ(contents.sizes[1], argvlen[1]);
+ EXPECT_EQ(contents.sizes[2], argvlen[2]);
+ EXPECT_EQ(contents.sizes[3], argvlen[3]);
+ EXPECT_EQ(contents.sizes[4], argvlen[4]);
+ EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
+ EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
+ EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
+ EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
+ EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
+ cb(acc, &redisReplyBuilder.buildNilReply(), pd);
+ return REDIS_OK;
+ }));
+ expectAck();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseNilReply)
+{
+ expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildNilReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
+ {
+ EXPECT_EQ(Reply::Type::NIL, reply.getType());
+ EXPECT_EQ(0, reply.getInteger());
+ EXPECT_TRUE(reply.getString()->str.empty());
+ EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
+ EXPECT_TRUE(reply.getArray()->empty());
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseIntegerReply)
+{
+ expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ auto expected(redisReplyBuilder.buildIntegerReply());
+ EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
+ EXPECT_EQ(expected.integer, reply.getInteger());
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStatusReply)
+{
+ expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ auto expected(redisReplyBuilder.buildStatusReply());
+ EXPECT_EQ(Reply::Type::STATUS, reply.getType());
+ EXPECT_EQ(expected.len, reply.getString()->len);
+ EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStringReply)
+{
+ expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStringReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ auto expected(redisReplyBuilder.buildStringReply());
+ EXPECT_EQ(Reply::Type::STRING, reply.getType());
+ EXPECT_EQ(expected.len, reply.getString()->len);
+ EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseArrayReply)
+{
+ expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ auto array(reply.getArray());
+ EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
+ EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
+ EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
+{
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn*, void*, const char*, int, int,
+ const char**, const size_t*)
+ {
+ acc->err = REDIS_ERR;
+ return REDIS_ERR;
+ }));
+ Engine::Callback storedCallback;
+ EXPECT_CALL(engineMock, postCallback(_))
+ .Times(1)
+ .WillOnce(SaveArg<0>(&storedCallback));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ expectAckError();
+ storedCallback();
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
+{
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
+ const char**, const size_t*)
+ {
+ cb(acc, nullptr, pd);
+ return REDIS_OK;
+ }));
+ expectAckError();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
+{
+ expectReplyError("LOADING Redis is loading the dataset in memory");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterDownIsRecognizedFromReply)
+{
+ //SDL checks only that reply starts with CLUSTERDOWN string
+ expectReplyError("CLUSTERDOWN");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+
+ expectReplyError("CLUSTERDOWN The cluster is down");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+
+ expectReplyError("CLUSTERDOW");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
+{
+ expectReplyError("ERR Protocol error: invalid bulk length");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
+{
+ expectReplyError("something sinister");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EmptyReplyErrorIsConvertedToUnknownError)
+{
+ expectReplyError("");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, IOErrorInContext)
+{
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
+ const char**, const size_t*)
+ {
+ acc->err = REDIS_ERR_IO;
+ errno = EINVAL;
+ cb(acc, nullptr, pd);
+ return REDIS_OK;
+ }));
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EofErrorInContext)
+{
+ expectContextError(REDIS_ERR_EOF);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorInContext)
+{
+ expectContextError(REDIS_ERR_PROTOCOL);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, OomErrorInContext)
+{
+ expectContextError(REDIS_ERR_OOM);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorNotConnectedInContext)
+{
+ expectContextError(CLUSTER_ERROR_NOT_CONNECTED);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorConnectionLostInContext)
+{
+ expectContextError(CLUSTER_ERROR_CONNECTION_LOST);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
+{
+ expectContextError(REDIS_ERR_OTHER);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
+{
+ InSequence dummy;
+ expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ expectAck();
+ savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
+ dispatcher->disableCommandCallbacks();
+ expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ expectAckNotCalled();
+ savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DisconnectCallbackDetachesContextFromAdapter)
+{
+ InSequence dummy;
+ expectAdapterDetach();
+ disconnected(&acc, &ac, 0);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
+{
+ InSequence dummy;
+ dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::disconnectCallback,
+ this));
+ expectAdapterDetach();
+ expectDisconnectCallback();
+ disconnected(&acc, &ac, 0);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
+{
+ InSequence dummy;
+ redisClusterCallbackFn* savedCb;
+ void* savedPd;
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
+ const char*, int, int, const char**, const size_t*)
+ {
+ savedCb = cb;
+ savedPd = pd;
+ return REDIS_OK;
+ }));
+ Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(3);
+ redisReply rr;
+ rr.type = REDIS_REPLY_NIL;
+ savedCb(&acc, &rr, savedPd);
+ savedCb(&acc, &rr, savedPd);
+ savedCb(&acc, &rr, savedPd);
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
+{
+ InSequence dummy;
+ redisClusterCallbackFn* savedCb;
+ void* savedPd;
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([this, &savedCb, &savedPd](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd,
+ const char*, int, int, const char**, const size_t*)
+ {
+ savedCb = cb;
+ savedPd = pd;
+ cb(acc, &redisReplyBuilder.buildNilReply(), pd);
+ return REDIS_OK;
+ }));
+ expectAck();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
+}
+
+TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, TooManyRepliesAborts)
+{
+ InSequence dummy;
+ redisClusterCallbackFn* savedCb;
+ void* savedPd;
+ EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
+ const char*, int, int, const char**, const size_t*)
+ {
+ savedCb = cb;
+ savedPd = pd;
+ return REDIS_OK;
+ }));
+ Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
+ expectAck();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd);
+ EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
+}