--- /dev/null
+/*
+ Copyright (c) 2018-2019 Nokia.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#include <type_traits>
+#include <memory>
+#include <cstring>
+#include <sys/epoll.h>
+#include <sys/timerfd.h>
+#include <sys/eventfd.h>
+#include <arpa/inet.h>
+#include <gtest/gtest.h>
+#include <async.h>
+#include "private/createlogger.hpp"
+#include "private/error.hpp"
+#include "private/logger.hpp"
+#include "private/redis/asynchirediscommanddispatcher.hpp"
+#include "private/redis/reply.hpp"
+#include "private/redis/contents.hpp"
+#include "private/timer.hpp"
+#include "private/tst/contentsbuildermock.hpp"
+#include "private/tst/enginemock.hpp"
+#include "private/tst/hiredissystemmock.hpp"
+#include "private/tst/hiredisepolladaptermock.hpp"
+#include "private/tst/redisreplybuilder.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+using namespace shareddatalayer::tst;
+using namespace testing;
+
+namespace
+{
+ class AsyncHiredisCommandDispatcherBaseTest: public testing::Test
+ {
+ public:
+ std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
+ StrictMock<EngineMock> engineMock;
+ HiredisSystemMock hiredisSystemMock;
+ std::shared_ptr<HiredisEpollAdapterMock> adapterMock;
+ redisAsyncContext ac;
+ int hiredisFd;
+ std::unique_ptr<AsyncHiredisCommandDispatcher> dispatcher;
+ void (*connected)(const redisAsyncContext*, int);
+ void (*disconnected)(const redisAsyncContext*, int);
+ Timer::Callback savedConnectionRetryTimerCallback;
+ Timer::Duration expectedConnectionRetryTimerDuration;
+ Timer::Duration expectedConnectionVerificationRetryTimerDuration;
+ RedisReplyBuilder redisReplyBuilder;
+ const AsyncConnection::Namespace defaultNamespace;
+ std::shared_ptr<Logger> logger;
+
+ AsyncHiredisCommandDispatcherBaseTest():
+ contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
+ adapterMock(std::make_shared<HiredisEpollAdapterMock>(engineMock, hiredisSystemMock)),
+ ac { },
+ hiredisFd(3),
+ connected(nullptr),
+ disconnected(nullptr),
+ expectedConnectionRetryTimerDuration(std::chrono::seconds(1)),
+ expectedConnectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
+ redisReplyBuilder { },
+ defaultNamespace("namespace"),
+ logger(createLogger(SDL_LOG_PREFIX))
+ {
+ }
+
+ virtual ~AsyncHiredisCommandDispatcherBaseTest()
+ {
+ }
+
+ MOCK_METHOD0(connectAck, void());
+
+ MOCK_METHOD0(disconnectCallback, void());
+
+ MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
+
+ void expectationsUntilConnect()
+ {
+ expectationsUntilConnect(ac);
+ }
+
+ void expectationsUntilConnect(redisAsyncContext& ac)
+ {
+ expectRedisAsyncConnect(ac);
+ }
+
+ void expectRedisAsyncConnect()
+ {
+ expectRedisAsyncConnect(ac);
+ }
+
+ void expectRedisAsyncConnect(redisAsyncContext& ac)
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncConnect(StrEq("host"), 6379U))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([this, &ac]()
+ {
+ ac.c.fd = hiredisFd;
+ return ∾
+ }));
+ }
+
+ void expectRedisAsyncConnectReturnNullptr()
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncConnect(StrEq("host"), 6379U))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([this]()
+ {
+ ac.c.fd = hiredisFd;
+ return nullptr;
+ }));
+ }
+
+ void expectRedisAsyncSetConnectCallback()
+ {
+ expectRedisAsyncSetConnectCallback(ac);
+ }
+
+ void expectRedisAsyncSetConnectCallback(redisAsyncContext& ac)
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncSetConnectCallback(&ac, _))
+ .Times(1)
+ .WillOnce(Invoke([this](const redisAsyncContext*, redisConnectCallback* cb)
+ {
+ connected = cb;
+ return REDIS_OK;
+ }));
+ }
+
+ void expectRedisAsyncSetDisconnectCallback()
+ {
+ expectRedisAsyncSetDisconnectCallback(ac);
+ }
+
+ void expectRedisAsyncSetDisconnectCallback(redisAsyncContext& ac)
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncSetDisconnectCallback(&ac, _))
+ .Times(1)
+ .WillOnce(Invoke([this](const redisAsyncContext*, redisDisconnectCallback* cb)
+ {
+ disconnected = cb;
+ return REDIS_OK;
+ }));
+ }
+
+ void expectAdapterAttach()
+ {
+ expectAdapterAttach(ac);
+ }
+
+ void expectAdapterAttach(redisAsyncContext& ac)
+ {
+ EXPECT_CALL(*adapterMock, attach(&ac))
+ .Times(1);
+ }
+
+ void expectConnectAck()
+ {
+ EXPECT_CALL(*this, connectAck())
+ .Times(1);
+ }
+
+ void expectDisconnectCallback()
+ {
+ EXPECT_CALL(*this, disconnectCallback())
+ .Times(1);
+ }
+
+ void expectRedisAsyncFree()
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncFree(&ac))
+ .Times(1);
+ }
+
+ void expectRedisAsyncDisconnect()
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncDisconnect(&ac))
+ .Times(1);
+ }
+
+ void expectRedisAsyncCommandArgv(redisReply& rr)
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([&rr](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
+ int, const char**, const size_t*)
+ {
+ cb(ac, &rr, pd);
+ return REDIS_OK;
+ }));
+ }
+
+ void expectCommandListQuery()
+ {
+ expectRedisAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
+ }
+
+ void expectCommandListQueryReturnError()
+ {
+ expectRedisAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
+ }
+
+ void verifyAckErrorReply(const Reply& reply)
+ {
+ EXPECT_EQ(Reply::Type::NIL, reply.getType());
+ EXPECT_EQ(0, reply.getInteger());
+ EXPECT_TRUE(reply.getString()->str.empty());
+ EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
+ EXPECT_TRUE(reply.getArray()->empty());
+ }
+
+ void expectAckError()
+ {
+ EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ verifyAckErrorReply(reply);
+ }));
+ }
+
+ void expectAckError(const std::error_code& ec)
+ {
+ EXPECT_CALL(*this, ack(ec, _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ verifyAckErrorReply(reply);
+ }));
+ }
+
+ void expectArmConnectionRetryTimer()
+ {
+ EXPECT_CALL(engineMock, armTimer(_, expectedConnectionRetryTimerDuration, _))
+ .Times(1)
+ .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
+ }
+
+ void expectArmConnectionVerificationRetryTimer()
+ {
+ EXPECT_CALL(engineMock, armTimer(_, expectedConnectionVerificationRetryTimerDuration, _))
+ .Times(1)
+ .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
+ }
+
+ void expectDisarmConnectionRetryTimer()
+ {
+ EXPECT_CALL(engineMock, disarmTimer(_))
+ .Times(1);
+ }
+ };
+
+ class AsyncHiredisCommandDispatcherDisconnectedTest: public AsyncHiredisCommandDispatcherBaseTest
+ {
+ public:
+ AsyncHiredisCommandDispatcherDisconnectedTest()
+ {
+ InSequence dummy;
+ expectationsUntilConnect();
+ expectAdapterAttach();
+ expectRedisAsyncSetConnectCallback();
+ expectRedisAsyncSetDisconnectCallback();
+ dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
+ "host",
+ htons(6379U),
+ contentsBuilderMock,
+ false,
+ hiredisSystemMock,
+ adapterMock,
+ logger));
+ }
+
+ ~AsyncHiredisCommandDispatcherDisconnectedTest()
+ {
+ }
+ };
+
+ class AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisCommandDispatcherBaseTest
+ {
+ public:
+ AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest()
+ {
+ InSequence dummy;
+ expectationsUntilConnect();
+ expectAdapterAttach();
+ expectRedisAsyncSetConnectCallback();
+ expectRedisAsyncSetDisconnectCallback();
+ dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
+ "host",
+ htons(6379U),
+ contentsBuilderMock,
+ true,
+ hiredisSystemMock,
+ adapterMock,
+ logger));
+ }
+
+ ~AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest()
+ {
+ expectRedisAsyncFree();
+ }
+ };
+
+ class AsyncHiredisCommandDispatcherConnectedTest: public AsyncHiredisCommandDispatcherDisconnectedTest
+ {
+ public:
+ Contents contents;
+ redisCallbackFn* savedCb;
+ void* savedPd;
+
+ AsyncHiredisCommandDispatcherConnectedTest():
+ contents { { "CMD", "key1", "value1", "key2", "value2" },
+ { 3, 4, 6, 4, 6 } },
+ savedCb(nullptr),
+ savedPd(nullptr)
+ {
+ expectCommandListQuery();
+ connected(&ac, 0);
+ }
+
+ ~AsyncHiredisCommandDispatcherConnectedTest()
+ {
+ expectRedisAsyncFree();
+ }
+
+ void expectAck()
+ {
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1);
+ }
+
+ void expectReplyError(const std::string& msg)
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([this, msg](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
+ int, const char**, const size_t*)
+ {
+ cb(ac, &redisReplyBuilder.buildErrorReply(msg), pd);
+ return REDIS_OK;
+ }));
+ }
+
+ void expectContextError(int code)
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([code](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
+ {
+ ac->err = code;
+ cb(ac, nullptr, pd);
+ return REDIS_OK;
+ }));
+ }
+
+ void expectRedisAsyncFreeCallPendingCallback(redisCallbackFn* cb, void* pd)
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncFree(&ac))
+ .Times(1)
+ .WillOnce(Invoke([cb, pd](redisAsyncContext* ac)
+ {
+ cb(ac, nullptr, pd);
+ }));
+ }
+
+ void expectAckNotCalled()
+ {
+ EXPECT_CALL(*this, ack(_,_))
+ .Times(0);
+ }
+
+ void expectRedisAsyncCommandArgv_SaveCb()
+ {
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillRepeatedly(Invoke([this](redisAsyncContext*, redisCallbackFn* cb, void* pd,
+ int, const char**, const size_t*)
+ {
+ savedCb = cb;
+ savedPd = pd;
+ return REDIS_OK;
+ }));
+ }
+ };
+
+ using AsyncHiredisCommandDispatcherDeathTest = AsyncHiredisCommandDispatcherConnectedTest;
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, IsNotCopyable)
+{
+ EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisCommandDispatcher>::value);
+ EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisCommandDispatcher>::value);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
+{
+ EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisCommandDispatcher>::value));
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
+{
+ Engine::Callback storedCallback;
+ EXPECT_CALL(engineMock, postCallback(_))
+ .Times(1)
+ .WillOnce(SaveArg<0>(&storedCallback));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ { });
+ expectAckError(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED));
+ storedCallback();
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ContextErrorInConnectArmsRetryTimer)
+{
+ InSequence dummy;
+ expectationsUntilConnect();
+ expectArmConnectionRetryTimer();
+ ac.err = 123;
+ dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
+ "host",
+ htons(6379U),
+ contentsBuilderMock,
+ false,
+ hiredisSystemMock,
+ adapterMock,
+ logger));
+ expectDisarmConnectionRetryTimer();
+}
+
+TEST_F(AsyncHiredisCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
+{
+ InSequence dummy;
+ expectRedisAsyncConnectReturnNullptr();
+ expectArmConnectionRetryTimer();
+ expectDisarmConnectionRetryTimer();
+
+ dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
+ "host",
+ htons(6379U),
+ contentsBuilderMock,
+ false,
+ hiredisSystemMock,
+ adapterMock,
+ logger));
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, FailedCommandListQueryArmsRetryTimer)
+{
+ InSequence dummy;
+ expectCommandListQueryReturnError();
+ expectArmConnectionVerificationRetryTimer();
+ expectRedisAsyncFree();
+ connected(&ac, 0);
+ expectDisarmConnectionRetryTimer();
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ErrorInConnectedCallbackArmsRetryTimer)
+{
+ InSequence dummy;
+ expectArmConnectionRetryTimer();
+ connected(&ac, -1);
+ expectDisarmConnectionRetryTimer();
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ConnectionSucceedsWithRetryTimer)
+{
+ InSequence dummy;
+ expectArmConnectionRetryTimer();
+
+ connected(&ac, -1);
+
+ expectationsUntilConnect();
+ expectAdapterAttach();
+ expectRedisAsyncSetConnectCallback();
+ expectRedisAsyncSetDisconnectCallback();
+
+ savedConnectionRetryTimerCallback();
+
+ expectCommandListQuery();
+ expectConnectAck();
+
+ dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::connectAck, this));
+ connected(&ac, 0);
+
+ expectRedisAsyncFree();
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ConnectAckCalledOnceConnected)
+{
+ InSequence dummy;
+ expectCommandListQuery();
+ expectConnectAck();
+ dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::connectAck, this));
+ connected(&ac, 0);
+ expectRedisAsyncFree();
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ConnectAckCalledIfConnected)
+{
+ Engine::Callback storedCallback;
+ EXPECT_CALL(engineMock, postCallback(_))
+ .Times(1)
+ .WillOnce(SaveArg<0>(&storedCallback));
+ expectCommandListQuery();
+ connected(&ac, 0);
+ dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::connectAck, this));
+ expectConnectAck();
+ storedCallback();
+ expectRedisAsyncFree();
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanDispatchCommands)
+{
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([this](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
+ int argc, const char** argv, const size_t* argvlen)
+ {
+ EXPECT_EQ((int)contents.stack.size(), argc);
+ EXPECT_EQ(contents.sizes[0], argvlen[0]);
+ EXPECT_EQ(contents.sizes[1], argvlen[1]);
+ EXPECT_EQ(contents.sizes[2], argvlen[2]);
+ EXPECT_EQ(contents.sizes[3], argvlen[3]);
+ EXPECT_EQ(contents.sizes[4], argvlen[4]);
+ EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
+ EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
+ EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
+ EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
+ EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
+ cb(ac, &redisReplyBuilder.buildNilReply(), pd);
+ return REDIS_OK;
+ }));
+ expectAck();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseNilReply)
+{
+ expectRedisAsyncCommandArgv(redisReplyBuilder.buildNilReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
+ {
+ EXPECT_EQ(Reply::Type::NIL, reply.getType());
+ EXPECT_EQ(0, reply.getInteger());
+ EXPECT_TRUE(reply.getString()->str.empty());
+ EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
+ EXPECT_TRUE(reply.getArray()->empty());
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseIntegerReply)
+{
+ expectRedisAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ auto expected(redisReplyBuilder.buildIntegerReply());
+ EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
+ EXPECT_EQ(expected.integer, reply.getInteger());
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseStatusReply)
+{
+ expectRedisAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ auto expected(redisReplyBuilder.buildStatusReply());
+ EXPECT_EQ(Reply::Type::STATUS, reply.getType());
+ EXPECT_EQ(expected.len, reply.getString()->len);
+ EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseStringReply)
+{
+ expectRedisAsyncCommandArgv(redisReplyBuilder.buildStringReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
+ {
+ auto expected(redisReplyBuilder.buildStringReply());
+ EXPECT_EQ(Reply::Type::STRING, reply.getType());
+ EXPECT_EQ(expected.len, reply.getString()->len);
+ EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseArrayReply)
+{
+ expectRedisAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(1)
+ .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
+ {
+ auto array(reply.getArray());
+ EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
+ EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
+ EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
+{
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn*, void*, int, const char**, const size_t*)
+ {
+ ac->err = REDIS_ERR;
+ return REDIS_ERR;
+ }));
+ Engine::Callback storedCallback;
+ EXPECT_CALL(engineMock, postCallback(_))
+ .Times(1)
+ .WillOnce(SaveArg<0>(&storedCallback));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ expectAckError();
+ storedCallback();
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
+{
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
+ {
+ cb(ac, nullptr, pd);
+ return REDIS_OK;
+ }));
+ expectAckError();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
+{
+ expectReplyError("LOADING Redis is loading the dataset in memory");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
+{
+ expectReplyError("ERR Protocol error: invalid bulk length");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
+{
+ expectReplyError("something sinister");
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, IOErrorInContext)
+{
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
+ {
+ ac->err = REDIS_ERR_IO;
+ errno = EINVAL;
+ cb(ac, nullptr, pd);
+ return REDIS_OK;
+ }));
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, IOErrorInContextWithECONNRESETerrnoValue)
+{
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
+ {
+ ac->err = REDIS_ERR_IO;
+ errno = ECONNRESET;
+ cb(ac, nullptr, pd);
+ return REDIS_OK;
+ }));
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, EofErrorInContext)
+{
+ expectContextError(REDIS_ERR_EOF);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, ProtocolErrorInContext)
+{
+ expectContextError(REDIS_ERR_PROTOCOL);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, OomErrorInContext)
+{
+ expectContextError(REDIS_ERR_OOM);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
+{
+ expectContextError(REDIS_ERR_OTHER);
+ EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
+ .Times(1);
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
+{
+ InSequence dummy;
+ expectRedisAsyncCommandArgv_SaveCb();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ expectAck();
+ savedCb(&ac, &redisReplyBuilder.buildStringReply(), savedPd);
+ dispatcher->disableCommandCallbacks();
+ expectRedisAsyncCommandArgv_SaveCb();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ expectAckNotCalled();
+ savedCb(&ac, &redisReplyBuilder.buildStringReply(), savedPd);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
+{
+ InSequence dummy;
+ dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::disconnectCallback,
+ this));
+ expectDisconnectCallback();
+ expectArmConnectionRetryTimer();
+ disconnected(&ac, 0);
+ expectDisarmConnectionRetryTimer();
+ expectCommandListQuery();
+ connected(&ac, 0); // restore connection to meet destructor expectations
+}
+
+TEST_F(AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
+{
+ InSequence dummy;
+ redisCallbackFn* savedCb;
+ void* savedPd;
+ Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
+ expectCommandListQuery();
+ connected(&ac, 0);
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([&savedCb, &savedPd](redisAsyncContext*, redisCallbackFn* cb, void* pd,
+ int, const char**, const size_t*)
+ {
+ savedCb = cb;
+ savedPd = pd;
+ return REDIS_OK;
+ }));
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ EXPECT_CALL(*this, ack(std::error_code(), _))
+ .Times(3);
+ redisReply rr;
+ rr.type = REDIS_REPLY_NIL;
+ savedCb(&ac, &rr, savedPd);
+ savedCb(&ac, &rr, savedPd);
+ savedCb(&ac, &rr, savedPd);
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
+{
+ redisCallbackFn* savedCb;
+ void* savedPd;
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([this, &savedCb, &savedPd](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
+ int, const char**, const size_t*)
+ {
+ savedCb = cb;
+ savedPd = pd;
+ cb(ac, &redisReplyBuilder.buildNilReply(), pd);
+ return REDIS_OK;
+ }));
+ expectAck();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherDeathTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ EXPECT_EXIT(savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
+}
+
+TEST_F(AsyncHiredisCommandDispatcherDeathTest, TooManyRepliesAborts)
+{
+ InSequence dummy;
+ redisCallbackFn* savedCb;
+ void* savedPd;
+ Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
+ EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
+ .Times(1)
+ .WillOnce(Invoke([&savedCb, &savedPd](redisAsyncContext*, redisCallbackFn* cb, void* pd,
+ int, const char**, const size_t*)
+ {
+ savedCb = cb;
+ savedPd = pd;
+ return REDIS_OK;
+ }));
+ expectAck();
+ dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ defaultNamespace,
+ contents);
+ savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd);
+ EXPECT_EXIT(savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
+}