X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=tst%2Fasynchiredisclustercommanddispatcher_test.cpp;fp=tst%2Fasynchiredisclustercommanddispatcher_test.cpp;h=14421f85e3b6402759346bc2246622300a6ba3c1;hb=ef2bf51d04aaf01fa0cabdcaf905b23423067662;hp=0000000000000000000000000000000000000000;hpb=edc9b96a441194b571e8d55ec1603b5be0ea52eb;p=ric-plt%2Fsdl.git diff --git a/tst/asynchiredisclustercommanddispatcher_test.cpp b/tst/asynchiredisclustercommanddispatcher_test.cpp new file mode 100644 index 0000000..14421f8 --- /dev/null +++ b/tst/asynchiredisclustercommanddispatcher_test.cpp @@ -0,0 +1,1026 @@ +/* + Copyright (c) 2018-2019 Nokia. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "private/createlogger.hpp" +#include "private/error.hpp" +#include "private/logger.hpp" +#include "private/redis/asynchiredisclustercommanddispatcher.hpp" +#include "private/redis/redisgeneral.hpp" +#include "private/redis/reply.hpp" +#include "private/redis/contents.hpp" +#include "private/tst/hiredisclustersystemmock.hpp" +#include "private/timer.hpp" +#include "private/tst/contentsbuildermock.hpp" +#include "private/tst/enginemock.hpp" +#include "private/tst/hiredisclusterepolladaptermock.hpp" +#include "private/tst/redisreplybuilder.hpp" + +using namespace shareddatalayer; +using namespace shareddatalayer::redis; +using namespace shareddatalayer::tst; +using namespace testing; + +namespace +{ + class AsyncHiredisClusterCommandDispatcherBaseTest: public testing::Test + { + public: + std::shared_ptr contentsBuilderMock; + StrictMock engineMock; + HiredisClusterSystemMock hiredisClusterSystemMock; + std::shared_ptr adapterMock; + redisClusterAsyncContext acc; + redisAsyncContext ac; + int hiredisFd; + std::unique_ptr dispatcher; + void (*connected)(const redisClusterAsyncContext*, const redisAsyncContext*, int); + void (*disconnected)(const redisClusterAsyncContext*, const redisAsyncContext*, int); + Timer::Callback savedConnectionRetryTimerCallback; + Timer::Duration expectedRetryTimerDuration; + Contents contents; + Contents clusterConnectionSetupContents; + RedisReplyBuilder redisReplyBuilder; + const AsyncConnection::Namespace defaultNamespace; + std::shared_ptr logger; + + AsyncHiredisClusterCommandDispatcherBaseTest(): + contentsBuilderMock(std::make_shared(AsyncConnection::SEPARATOR)), + adapterMock(std::make_shared(engineMock, hiredisClusterSystemMock)), + acc { }, + ac { }, + hiredisFd(3), + connected(nullptr), + disconnected(nullptr), + expectedRetryTimerDuration(std::chrono::seconds(1)), + contents { { "CMD", "key1", "value1", "key2", "value2" }, + { 3, 4, 6, 4, 6 } }, + redisReplyBuilder { }, + defaultNamespace("namespace"), + logger(createLogger(SDL_LOG_PREFIX)) + { + } + + virtual ~AsyncHiredisClusterCommandDispatcherBaseTest() + { + } + + MOCK_METHOD0(connectAck, void()); + + MOCK_METHOD0(disconnectCallback, void()); + + MOCK_METHOD2(ack, void(const std::error_code&, const Reply&)); + + void expectationsUntilConnect() + { + expectationsUntilConnect(acc); + } + + void expectationsUntilConnect(redisClusterAsyncContext& acc) + { + expectRedisClusterAsyncConnect(acc); + } + + void expectRedisClusterAsyncConnect() + { + expectRedisClusterAsyncConnect(acc); + } + + void expectRedisClusterAsyncConnect(redisClusterAsyncContext& acc) + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"), + HIRCLUSTER_FLAG_ROUTE_USE_SLOTS)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this, &acc]() + { + return &acc; + })); + } + + void expectRedisClusterAsyncConnectReturnNullptr() + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"), + HIRCLUSTER_FLAG_ROUTE_USE_SLOTS)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this]() + { + return nullptr; + })); + } + + void expectRedisClusterAsyncSetConnectCallback() + { + expectRedisClusterAsyncSetConnectCallback(acc); + } + + void expectRedisClusterAsyncSetConnectCallback(redisClusterAsyncContext& acc) + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetConnectCallback(&acc, _)) + .Times(1) + .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceConnectCallback* cb) + { + connected = cb; + return REDIS_OK; + })); + } + + void expectRedisClusterAsyncSetDisconnectCallback() + { + expectRedisClusterAsyncSetDisconnectCallback(acc); + } + + void expectRedisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext& acc) + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetDisconnectCallback(&acc, _)) + .Times(1) + .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceDisconnectCallback* cb) + { + disconnected = cb; + return REDIS_OK; + })); + } + + void expectCommandListQuery() + { + expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply()); + } + + void expectCommandListQueryReturnError() + { + expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery")); + } + + void expectAdapterSetup() + { + expectAdapterSetup(acc); + } + + void expectAdapterSetup(redisClusterAsyncContext& acc) + { + EXPECT_CALL(*adapterMock, setup(&acc)) + .Times(1); + } + + void expectAdapterDetach() + { + EXPECT_CALL(*adapterMock, detach(&ac)) + .Times(1); + } + + void expectConnectAck() + { + EXPECT_CALL(*this, connectAck()) + .Times(1); + } + + void expectDisconnectCallback() + { + EXPECT_CALL(*this, disconnectCallback()) + .Times(1); + } + + void expectRedisClusterAsyncFree() + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc)) + .Times(1); + } + + void expectRedisClusterAsyncDisconnect() + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncDisconnect(&acc)) + .Times(1); + } + + void verifyAckErrorReply(const Reply& reply) + { + EXPECT_EQ(Reply::Type::NIL, reply.getType()); + EXPECT_EQ(0, reply.getInteger()); + EXPECT_TRUE(reply.getString()->str.empty()); + EXPECT_EQ(static_cast(0), reply.getString()->len); + EXPECT_TRUE(reply.getArray()->empty()); + } + + void expectAckError() + { + EXPECT_CALL(*this, ack(Ne(std::error_code()), _)) + .Times(1) + .WillOnce(Invoke([this](const std::error_code&, const Reply& reply) + { + verifyAckErrorReply(reply); + })); + } + + void expectAckError(const std::error_code& ec) + { + EXPECT_CALL(*this, ack(ec, _)) + .Times(1) + .WillOnce(Invoke([this](const std::error_code&, const Reply& reply) + { + verifyAckErrorReply(reply); + })); + } + + void expectArmConnectionRetryTimer() + { + EXPECT_CALL(engineMock, armTimer(_, expectedRetryTimerDuration, _)) + .Times(1) + .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback)); + } + + void expectDisarmConnectionRetryTimer() + { + EXPECT_CALL(engineMock, disarmTimer(_)) + .Times(1); + } + + void expectRedisClusterAsyncCommandArgv(redisReply& rr) + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([&rr](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, + int, const char**, const size_t*) + { + cb(acc, &rr, pd); + return REDIS_OK; + })); + } + + void expectAck() + { + EXPECT_CALL(*this, ack(std::error_code(), _)) + .Times(1); + } + + void expectReplyError(const std::string& msg) + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([this, msg](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, + int, int, const char**, const size_t*) + { + cb(acc, &redisReplyBuilder.buildErrorReply(msg), pd); + return REDIS_OK; + })); + } + + void expectContextError(int code) + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([code](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, + int, const char**, const size_t*) + { + acc->err = code; + cb(acc, nullptr, pd); + return REDIS_OK; + })); + } + + void expectRedisClusterAsyncFreeCallPendingCallback(redisClusterCallbackFn* cb, void* pd) + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc)) + .Times(1) + .WillOnce(Invoke([this, cb, pd](redisClusterAsyncContext* acc) + { + cb(acc, &redisReplyBuilder.buildNilReply(), pd); + })); + } + + void expectAckNotCalled() + { + EXPECT_CALL(*this, ack(_,_)) + .Times(0); + } + + void expectionsForSuccessfullConnectionSetup() + { + expectationsUntilConnect(); + expectAdapterSetup(); + expectRedisClusterAsyncSetConnectCallback(); + expectRedisClusterAsyncSetDisconnectCallback(); + expectCommandListQuery(); + } + + void callConnectionRetryTimerCallback() + { + ASSERT_NE(savedConnectionRetryTimerCallback, nullptr); + savedConnectionRetryTimerCallback(); + } + }; + + class AsyncHiredisClusterCommandDispatcherDisconnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest + { + public: + AsyncHiredisClusterCommandDispatcherDisconnectedTest() + { + InSequence dummy; + expectationsUntilConnect(); + expectAdapterSetup(); + expectRedisClusterAsyncSetConnectCallback(); + expectRedisClusterAsyncSetDisconnectCallback(); + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1); + dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock, + defaultNamespace, + { { "addr1", 111 }, { "addr2", 222 } }, + contentsBuilderMock, + false, + hiredisClusterSystemMock, + adapterMock, + logger)); + } + + ~AsyncHiredisClusterCommandDispatcherDisconnectedTest() + { + } + }; + + class AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisClusterCommandDispatcherBaseTest + { + public: + AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest() + { + InSequence dummy; + expectionsForSuccessfullConnectionSetup(); + dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock, + defaultNamespace, + { { "addr1", 111 }, { "addr2", 222 } }, + contentsBuilderMock, + true, + hiredisClusterSystemMock, + adapterMock, + logger)); + } + + ~AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest() + { + expectRedisClusterAsyncFree(); + } + }; + + class AsyncHiredisClusterCommandDispatcherConnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest + { + public: + redisClusterCallbackFn* savedCb; + void* savedPd; + + AsyncHiredisClusterCommandDispatcherConnectedTest(): + savedCb(nullptr), + savedPd(nullptr) + { + InSequence dummy; + expectionsForSuccessfullConnectionSetup(); + dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock, + defaultNamespace, + { { "addr1", 111 }, { "addr2", 222 } }, + contentsBuilderMock, + false, + hiredisClusterSystemMock, + adapterMock, + logger)); + connected(&acc, &ac, 0); + } + + ~AsyncHiredisClusterCommandDispatcherConnectedTest() + { + expectRedisClusterAsyncFree(); + } + + void expectRedisClusterAsyncCommandArgvWithKey_SaveCb() + { + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([this](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd, + const char*, int, int, const char**, const size_t*) + { + savedCb = cb; + savedPd = pd; + return REDIS_OK; + })); + } + }; + + using AsyncHiredisClusterCommandDispatcherDeathTest = AsyncHiredisClusterCommandDispatcherConnectedTest; +} + +TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, IsNotCopyable) +{ + EXPECT_FALSE(std::is_copy_constructible::value); + EXPECT_FALSE(std::is_copy_assignable::value); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher) +{ + EXPECT_TRUE((std::is_base_of::value)); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected) +{ + Engine::Callback storedCallback; + EXPECT_CALL(engineMock, postCallback(_)) + .Times(1) + .WillOnce(SaveArg<0>(&storedCallback)); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + { }); + expectAckError(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED); + storedCallback(); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ContextErrorInConnectArmsRetryTimer) +{ + InSequence dummy; + acc.err = 123; + expectationsUntilConnect(); + expectArmConnectionRetryTimer(); + expectDisarmConnectionRetryTimer(); + + dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock, + defaultNamespace, + { { "addr1", 111 }, { "addr2", 222 } }, + contentsBuilderMock, + false, + hiredisClusterSystemMock, + adapterMock, + logger)); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer) +{ + InSequence dummy; + expectRedisClusterAsyncConnectReturnNullptr(); + expectArmConnectionRetryTimer(); + expectDisarmConnectionRetryTimer(); + + dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock, + defaultNamespace, + { { "addr1", 111 }, { "addr2", 222 } }, + contentsBuilderMock, + false, + hiredisClusterSystemMock, + adapterMock, + logger)); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, FailedCommandListQueryArmsRetryTimer) +{ + InSequence dummy; + Engine::Callback storedCallback; + expectationsUntilConnect(); + expectAdapterSetup(); + expectRedisClusterAsyncSetConnectCallback(); + expectRedisClusterAsyncSetDisconnectCallback(); + expectCommandListQueryReturnError(); + expectArmConnectionRetryTimer(); + + dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock, + defaultNamespace, + { { "addr1", 111 }, { "addr2", 222 } }, + contentsBuilderMock, + false, + hiredisClusterSystemMock, + adapterMock, + logger)); + + expectDisarmConnectionRetryTimer(); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ConnectionSucceedsWithRetryTimer) +{ + InSequence dummy; + expectRedisClusterAsyncConnectReturnNullptr(); + expectArmConnectionRetryTimer(); + + dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock, + defaultNamespace, + { { "addr1", 111 }, { "addr2", 222 } }, + contentsBuilderMock, + false, + hiredisClusterSystemMock, + adapterMock, + logger)); + + expectionsForSuccessfullConnectionSetup(); + expectRedisClusterAsyncFree(); + + callConnectionRetryTimerCallback(); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ConnectAckCalledIfConnected) +{ + Engine::Callback storedCallback; + EXPECT_CALL(engineMock, postCallback(_)) + .Times(1) + .WillOnce(SaveArg<0>(&storedCallback)); + dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::connectAck, + this)); + expectConnectAck(); + storedCallback(); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanDispatchCommands) +{ + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([this](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char *key, + int keylen, int argc, const char** argv, const size_t* argvlen) + { + EXPECT_STREQ(defaultNamespace.c_str(), key); + EXPECT_EQ(9, keylen); + EXPECT_EQ((int)contents.stack.size(), argc); + EXPECT_EQ(contents.sizes[0], argvlen[0]); + EXPECT_EQ(contents.sizes[1], argvlen[1]); + EXPECT_EQ(contents.sizes[2], argvlen[2]); + EXPECT_EQ(contents.sizes[3], argvlen[3]); + EXPECT_EQ(contents.sizes[4], argvlen[4]); + EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0])); + EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1])); + EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2])); + EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3])); + EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4])); + cb(acc, &redisReplyBuilder.buildNilReply(), pd); + return REDIS_OK; + })); + expectAck(); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseNilReply) +{ + expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildNilReply()); + EXPECT_CALL(*this, ack(std::error_code(), _)) + .Times(1) + .WillOnce(Invoke([](const std::error_code&, const Reply& reply) + { + EXPECT_EQ(Reply::Type::NIL, reply.getType()); + EXPECT_EQ(0, reply.getInteger()); + EXPECT_TRUE(reply.getString()->str.empty()); + EXPECT_EQ(static_cast(0), reply.getString()->len); + EXPECT_TRUE(reply.getArray()->empty()); + })); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseIntegerReply) +{ + expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildIntegerReply()); + EXPECT_CALL(*this, ack(std::error_code(), _)) + .Times(1) + .WillOnce(Invoke([this](const std::error_code&, const Reply& reply) + { + auto expected(redisReplyBuilder.buildIntegerReply()); + EXPECT_EQ(Reply::Type::INTEGER, reply.getType()); + EXPECT_EQ(expected.integer, reply.getInteger()); + })); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStatusReply) +{ + expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStatusReply()); + EXPECT_CALL(*this, ack(std::error_code(), _)) + .Times(1) + .WillOnce(Invoke([this](const std::error_code&, const Reply& reply) + { + auto expected(redisReplyBuilder.buildStatusReply()); + EXPECT_EQ(Reply::Type::STATUS, reply.getType()); + EXPECT_EQ(expected.len, reply.getString()->len); + EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len)); + })); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStringReply) +{ + expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStringReply()); + EXPECT_CALL(*this, ack(std::error_code(), _)) + .Times(1) + .WillOnce(Invoke([this](const std::error_code&, const Reply& reply) + { + auto expected(redisReplyBuilder.buildStringReply()); + EXPECT_EQ(Reply::Type::STRING, reply.getType()); + EXPECT_EQ(expected.len, reply.getString()->len); + EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len)); + })); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseArrayReply) +{ + expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildArrayReply()); + EXPECT_CALL(*this, ack(std::error_code(), _)) + .Times(1) + .WillOnce(Invoke([this](const std::error_code&, const Reply& reply) + { + auto array(reply.getArray()); + EXPECT_EQ(Reply::Type::ARRAY, reply.getType()); + EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType()); + EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType()); + })); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors) +{ + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn*, void*, const char*, int, int, + const char**, const size_t*) + { + acc->err = REDIS_ERR; + return REDIS_ERR; + })); + Engine::Callback storedCallback; + EXPECT_CALL(engineMock, postCallback(_)) + .Times(1) + .WillOnce(SaveArg<0>(&storedCallback)); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); + expectAckError(); + storedCallback(); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors) +{ + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int, + const char**, const size_t*) + { + cb(acc, nullptr, pd); + return REDIS_OK; + })); + expectAckError(); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply) +{ + expectReplyError("LOADING Redis is loading the dataset in memory"); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterDownIsRecognizedFromReply) +{ + //SDL checks only that reply starts with CLUSTERDOWN string + expectReplyError("CLUSTERDOWN"); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); + + expectReplyError("CLUSTERDOWN The cluster is down"); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); + + expectReplyError("CLUSTERDOW"); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply) +{ + expectReplyError("ERR Protocol error: invalid bulk length"); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError) +{ + expectReplyError("something sinister"); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EmptyReplyErrorIsConvertedToUnknownError) +{ + expectReplyError(""); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, IOErrorInContext) +{ + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int, + const char**, const size_t*) + { + acc->err = REDIS_ERR_IO; + errno = EINVAL; + cb(acc, nullptr, pd); + return REDIS_OK; + })); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EofErrorInContext) +{ + expectContextError(REDIS_ERR_EOF); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorInContext) +{ + expectContextError(REDIS_ERR_PROTOCOL); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, OomErrorInContext) +{ + expectContextError(REDIS_ERR_OOM); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorNotConnectedInContext) +{ + expectContextError(CLUSTER_ERROR_NOT_CONNECTED); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorConnectionLostInContext) +{ + expectContextError(CLUSTER_ERROR_CONNECTION_LOST); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError) +{ + expectContextError(REDIS_ERR_OTHER); + EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _)) + .Times(1); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled) +{ + InSequence dummy; + expectRedisClusterAsyncCommandArgvWithKey_SaveCb(); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); + expectAck(); + savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd); + dispatcher->disableCommandCallbacks(); + expectRedisClusterAsyncCommandArgvWithKey_SaveCb(); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); + expectAckNotCalled(); + savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DisconnectCallbackDetachesContextFromAdapter) +{ + InSequence dummy; + expectAdapterDetach(); + disconnected(&acc, &ac, 0); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled) +{ + InSequence dummy; + dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::disconnectCallback, + this)); + expectAdapterDetach(); + expectDisconnectCallback(); + disconnected(&acc, &ac, 0); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand) +{ + InSequence dummy; + redisClusterCallbackFn* savedCb; + void* savedPd; + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd, + const char*, int, int, const char**, const size_t*) + { + savedCb = cb; + savedPd = pd; + return REDIS_OK; + })); + Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } }); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); + EXPECT_CALL(*this, ack(std::error_code(), _)) + .Times(3); + redisReply rr; + rr.type = REDIS_REPLY_NIL; + savedCb(&acc, &rr, savedPd); + savedCb(&acc, &rr, savedPd); + savedCb(&acc, &rr, savedPd); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, CbRemovedAfterHiredisCb) +{ + InSequence dummy; + redisClusterCallbackFn* savedCb; + void* savedPd; + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([this, &savedCb, &savedPd](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, + const char*, int, int, const char**, const size_t*) + { + savedCb = cb; + savedPd = pd; + cb(acc, &redisReplyBuilder.buildNilReply(), pd); + return REDIS_OK; + })); + expectAck(); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); + EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), ""); +} + +TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, TooManyRepliesAborts) +{ + InSequence dummy; + redisClusterCallbackFn* savedCb; + void* savedPd; + EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _)) + .Times(1) + .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd, + const char*, int, int, const char**, const size_t*) + { + savedCb = cb; + savedPd = pd; + return REDIS_OK; + })); + Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } }); + expectAck(); + dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack, + this, + std::placeholders::_1, + std::placeholders::_2), + defaultNamespace, + contents); + savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd); + EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), ""); +}