2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
22 #include <type_traits>
25 #include <sys/epoll.h>
26 #include <sys/timerfd.h>
27 #include <sys/eventfd.h>
28 #include <arpa/inet.h>
29 #include <gtest/gtest.h>
31 #include "private/createlogger.hpp"
32 #include "private/error.hpp"
33 #include "private/logger.hpp"
34 #include "private/redis/asynchiredisclustercommanddispatcher.hpp"
35 #include "private/redis/redisgeneral.hpp"
36 #include "private/redis/reply.hpp"
37 #include "private/redis/contents.hpp"
38 #include "private/tst/hiredisclustersystemmock.hpp"
39 #include "private/timer.hpp"
40 #include "private/tst/contentsbuildermock.hpp"
41 #include "private/tst/enginemock.hpp"
42 #include "private/tst/hiredisclusterepolladaptermock.hpp"
43 #include "private/tst/redisreplybuilder.hpp"
45 using namespace shareddatalayer;
46 using namespace shareddatalayer::redis;
47 using namespace shareddatalayer::tst;
48 using namespace testing;
52 class AsyncHiredisClusterCommandDispatcherBaseTest: public testing::Test
55 std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
56 StrictMock<EngineMock> engineMock;
57 HiredisClusterSystemMock hiredisClusterSystemMock;
58 std::shared_ptr<HiredisClusterEpollAdapterMock> adapterMock;
59 redisClusterAsyncContext acc;
62 std::unique_ptr<AsyncHiredisClusterCommandDispatcher> dispatcher;
63 void (*connected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
64 void (*disconnected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
65 Timer::Callback savedConnectionRetryTimerCallback;
66 Timer::Duration expectedRetryTimerDuration;
68 Contents clusterConnectionSetupContents;
69 RedisReplyBuilder redisReplyBuilder;
70 const AsyncConnection::Namespace defaultNamespace;
71 std::shared_ptr<Logger> logger;
73 AsyncHiredisClusterCommandDispatcherBaseTest():
74 contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
75 adapterMock(std::make_shared<HiredisClusterEpollAdapterMock>(engineMock, hiredisClusterSystemMock)),
80 disconnected(nullptr),
81 expectedRetryTimerDuration(std::chrono::seconds(1)),
82 contents { { "CMD", "key1", "value1", "key2", "value2" },
84 redisReplyBuilder { },
85 defaultNamespace("namespace"),
86 logger(createLogger(SDL_LOG_PREFIX))
90 virtual ~AsyncHiredisClusterCommandDispatcherBaseTest()
94 MOCK_METHOD0(connectAck, void());
96 MOCK_METHOD0(disconnectCallback, void());
98 MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
100 void expectationsUntilConnect()
102 expectationsUntilConnect(acc);
105 void expectationsUntilConnect(redisClusterAsyncContext& acc)
107 expectRedisClusterAsyncConnect(acc);
110 void expectRedisClusterAsyncConnect()
112 expectRedisClusterAsyncConnect(acc);
115 void expectRedisClusterAsyncConnect(redisClusterAsyncContext& acc)
117 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
118 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
120 .WillOnce(InvokeWithoutArgs([this, &acc]()
126 void expectRedisClusterAsyncConnectReturnNullptr()
128 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
129 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
131 .WillOnce(InvokeWithoutArgs([this]()
137 void expectRedisClusterAsyncSetConnectCallback()
139 expectRedisClusterAsyncSetConnectCallback(acc);
142 void expectRedisClusterAsyncSetConnectCallback(redisClusterAsyncContext& acc)
144 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetConnectCallback(&acc, _))
146 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceConnectCallback* cb)
153 void expectRedisClusterAsyncSetDisconnectCallback()
155 expectRedisClusterAsyncSetDisconnectCallback(acc);
158 void expectRedisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext& acc)
160 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetDisconnectCallback(&acc, _))
162 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceDisconnectCallback* cb)
169 void expectCommandListQuery()
171 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
174 void expectCommandListQueryReturnError()
176 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
179 void expectAdapterSetup()
181 expectAdapterSetup(acc);
184 void expectAdapterSetup(redisClusterAsyncContext& acc)
186 EXPECT_CALL(*adapterMock, setup(&acc))
190 void expectAdapterDetach()
192 EXPECT_CALL(*adapterMock, detach(&ac))
196 void expectConnectAck()
198 EXPECT_CALL(*this, connectAck())
202 void expectDisconnectCallback()
204 EXPECT_CALL(*this, disconnectCallback())
208 void expectRedisClusterAsyncFree()
210 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
214 void expectRedisClusterAsyncDisconnect()
216 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncDisconnect(&acc))
220 void verifyAckErrorReply(const Reply& reply)
222 EXPECT_EQ(Reply::Type::NIL, reply.getType());
223 EXPECT_EQ(0, reply.getInteger());
224 EXPECT_TRUE(reply.getString()->str.empty());
225 EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
226 EXPECT_TRUE(reply.getArray()->empty());
229 void expectAckError()
231 EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
233 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
235 verifyAckErrorReply(reply);
239 void expectAckError(const std::error_code& ec)
241 EXPECT_CALL(*this, ack(ec, _))
243 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
245 verifyAckErrorReply(reply);
249 void expectArmConnectionRetryTimer()
251 EXPECT_CALL(engineMock, armTimer(_, expectedRetryTimerDuration, _))
253 .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
256 void expectDisarmConnectionRetryTimer()
258 EXPECT_CALL(engineMock, disarmTimer(_))
262 void expectRedisClusterAsyncCommandArgv(redisReply& rr)
264 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
266 .WillOnce(Invoke([&rr](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
267 int, const char**, const size_t*)
276 EXPECT_CALL(*this, ack(std::error_code(), _))
280 void expectReplyError(const std::string& msg)
282 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
284 .WillOnce(Invoke([this, msg](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*,
285 int, int, const char**, const size_t*)
287 cb(acc, &redisReplyBuilder.buildErrorReply(msg), pd);
292 void expectContextError(int code)
294 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
296 .WillOnce(Invoke([code](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
297 int, const char**, const size_t*)
300 cb(acc, nullptr, pd);
305 void expectRedisClusterAsyncFreeCallPendingCallback(redisClusterCallbackFn* cb, void* pd)
307 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
309 .WillOnce(Invoke([this, cb, pd](redisClusterAsyncContext* acc)
311 cb(acc, &redisReplyBuilder.buildNilReply(), pd);
315 void expectAckNotCalled()
317 EXPECT_CALL(*this, ack(_,_))
321 void expectionsForSuccessfullConnectionSetup()
323 expectationsUntilConnect();
324 expectAdapterSetup();
325 expectRedisClusterAsyncSetConnectCallback();
326 expectRedisClusterAsyncSetDisconnectCallback();
327 expectCommandListQuery();
330 void callConnectionRetryTimerCallback()
332 ASSERT_NE(savedConnectionRetryTimerCallback, nullptr);
333 savedConnectionRetryTimerCallback();
337 class AsyncHiredisClusterCommandDispatcherDisconnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
340 AsyncHiredisClusterCommandDispatcherDisconnectedTest()
343 expectationsUntilConnect();
344 expectAdapterSetup();
345 expectRedisClusterAsyncSetConnectCallback();
346 expectRedisClusterAsyncSetDisconnectCallback();
347 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
349 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
351 { { "addr1", 111 }, { "addr2", 222 } },
354 hiredisClusterSystemMock,
359 ~AsyncHiredisClusterCommandDispatcherDisconnectedTest()
364 class AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisClusterCommandDispatcherBaseTest
367 AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
370 expectionsForSuccessfullConnectionSetup();
371 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
373 { { "addr1", 111 }, { "addr2", 222 } },
376 hiredisClusterSystemMock,
381 ~AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
383 expectRedisClusterAsyncFree();
387 class AsyncHiredisClusterCommandDispatcherConnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
390 redisClusterCallbackFn* savedCb;
393 AsyncHiredisClusterCommandDispatcherConnectedTest():
398 expectionsForSuccessfullConnectionSetup();
399 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
401 { { "addr1", 111 }, { "addr2", 222 } },
404 hiredisClusterSystemMock,
407 connected(&acc, &ac, 0);
410 ~AsyncHiredisClusterCommandDispatcherConnectedTest()
412 expectRedisClusterAsyncFree();
415 void expectRedisClusterAsyncCommandArgvWithKey_SaveCb()
417 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
419 .WillOnce(Invoke([this](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
420 const char*, int, int, const char**, const size_t*)
429 using AsyncHiredisClusterCommandDispatcherDeathTest = AsyncHiredisClusterCommandDispatcherConnectedTest;
432 TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, IsNotCopyable)
434 EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisClusterCommandDispatcher>::value);
435 EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisClusterCommandDispatcher>::value);
438 TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
440 EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisClusterCommandDispatcher>::value));
443 TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
445 Engine::Callback storedCallback;
446 EXPECT_CALL(engineMock, postCallback(_))
448 .WillOnce(SaveArg<0>(&storedCallback));
449 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::ack,
451 std::placeholders::_1,
452 std::placeholders::_2),
455 expectAckError(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED);
459 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ContextErrorInConnectArmsRetryTimer)
463 expectationsUntilConnect();
464 expectArmConnectionRetryTimer();
465 expectDisarmConnectionRetryTimer();
467 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
469 { { "addr1", 111 }, { "addr2", 222 } },
472 hiredisClusterSystemMock,
477 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
480 expectRedisClusterAsyncConnectReturnNullptr();
481 expectArmConnectionRetryTimer();
482 expectDisarmConnectionRetryTimer();
484 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
486 { { "addr1", 111 }, { "addr2", 222 } },
489 hiredisClusterSystemMock,
494 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, FailedCommandListQueryArmsRetryTimer)
497 Engine::Callback storedCallback;
498 expectationsUntilConnect();
499 expectAdapterSetup();
500 expectRedisClusterAsyncSetConnectCallback();
501 expectRedisClusterAsyncSetDisconnectCallback();
502 expectCommandListQueryReturnError();
503 expectArmConnectionRetryTimer();
505 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
507 { { "addr1", 111 }, { "addr2", 222 } },
510 hiredisClusterSystemMock,
514 expectDisarmConnectionRetryTimer();
517 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ConnectionSucceedsWithRetryTimer)
520 expectRedisClusterAsyncConnectReturnNullptr();
521 expectArmConnectionRetryTimer();
523 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
525 { { "addr1", 111 }, { "addr2", 222 } },
528 hiredisClusterSystemMock,
532 expectionsForSuccessfullConnectionSetup();
533 expectRedisClusterAsyncFree();
535 callConnectionRetryTimerCallback();
538 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ConnectAckCalledIfConnected)
540 Engine::Callback storedCallback;
541 EXPECT_CALL(engineMock, postCallback(_))
543 .WillOnce(SaveArg<0>(&storedCallback));
544 dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::connectAck,
550 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanDispatchCommands)
552 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
554 .WillOnce(Invoke([this](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char *key,
555 int keylen, int argc, const char** argv, const size_t* argvlen)
557 EXPECT_STREQ(defaultNamespace.c_str(), key);
558 EXPECT_EQ(9, keylen);
559 EXPECT_EQ((int)contents.stack.size(), argc);
560 EXPECT_EQ(contents.sizes[0], argvlen[0]);
561 EXPECT_EQ(contents.sizes[1], argvlen[1]);
562 EXPECT_EQ(contents.sizes[2], argvlen[2]);
563 EXPECT_EQ(contents.sizes[3], argvlen[3]);
564 EXPECT_EQ(contents.sizes[4], argvlen[4]);
565 EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
566 EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
567 EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
568 EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
569 EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
570 cb(acc, &redisReplyBuilder.buildNilReply(), pd);
574 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
576 std::placeholders::_1,
577 std::placeholders::_2),
582 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseNilReply)
584 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildNilReply());
585 EXPECT_CALL(*this, ack(std::error_code(), _))
587 .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
589 EXPECT_EQ(Reply::Type::NIL, reply.getType());
590 EXPECT_EQ(0, reply.getInteger());
591 EXPECT_TRUE(reply.getString()->str.empty());
592 EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
593 EXPECT_TRUE(reply.getArray()->empty());
595 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
597 std::placeholders::_1,
598 std::placeholders::_2),
603 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseIntegerReply)
605 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
606 EXPECT_CALL(*this, ack(std::error_code(), _))
608 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
610 auto expected(redisReplyBuilder.buildIntegerReply());
611 EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
612 EXPECT_EQ(expected.integer, reply.getInteger());
614 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
616 std::placeholders::_1,
617 std::placeholders::_2),
622 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStatusReply)
624 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
625 EXPECT_CALL(*this, ack(std::error_code(), _))
627 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
629 auto expected(redisReplyBuilder.buildStatusReply());
630 EXPECT_EQ(Reply::Type::STATUS, reply.getType());
631 EXPECT_EQ(expected.len, reply.getString()->len);
632 EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
634 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
636 std::placeholders::_1,
637 std::placeholders::_2),
642 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStringReply)
644 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStringReply());
645 EXPECT_CALL(*this, ack(std::error_code(), _))
647 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
649 auto expected(redisReplyBuilder.buildStringReply());
650 EXPECT_EQ(Reply::Type::STRING, reply.getType());
651 EXPECT_EQ(expected.len, reply.getString()->len);
652 EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
654 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
656 std::placeholders::_1,
657 std::placeholders::_2),
662 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseArrayReply)
664 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
665 EXPECT_CALL(*this, ack(std::error_code(), _))
667 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
669 auto array(reply.getArray());
670 EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
671 EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
672 EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
674 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
676 std::placeholders::_1,
677 std::placeholders::_2),
682 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
684 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
686 .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn*, void*, const char*, int, int,
687 const char**, const size_t*)
689 acc->err = REDIS_ERR;
692 Engine::Callback storedCallback;
693 EXPECT_CALL(engineMock, postCallback(_))
695 .WillOnce(SaveArg<0>(&storedCallback));
696 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
698 std::placeholders::_1,
699 std::placeholders::_2),
706 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
708 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
710 .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
711 const char**, const size_t*)
713 cb(acc, nullptr, pd);
717 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
719 std::placeholders::_1,
720 std::placeholders::_2),
725 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
727 expectReplyError("LOADING Redis is loading the dataset in memory");
728 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
730 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
732 std::placeholders::_1,
733 std::placeholders::_2),
738 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterDownIsRecognizedFromReply)
740 //SDL checks only that reply starts with CLUSTERDOWN string
741 expectReplyError("CLUSTERDOWN");
742 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
744 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
746 std::placeholders::_1,
747 std::placeholders::_2),
751 expectReplyError("CLUSTERDOWN The cluster is down");
752 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
754 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
756 std::placeholders::_1,
757 std::placeholders::_2),
761 expectReplyError("CLUSTERDOW");
762 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
764 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
766 std::placeholders::_1,
767 std::placeholders::_2),
772 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
774 expectReplyError("ERR Protocol error: invalid bulk length");
775 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
777 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
779 std::placeholders::_1,
780 std::placeholders::_2),
785 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
787 expectReplyError("something sinister");
788 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
790 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
792 std::placeholders::_1,
793 std::placeholders::_2),
798 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EmptyReplyErrorIsConvertedToUnknownError)
800 expectReplyError("");
801 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
803 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
805 std::placeholders::_1,
806 std::placeholders::_2),
811 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, IOErrorInContext)
813 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
815 .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
816 const char**, const size_t*)
818 acc->err = REDIS_ERR_IO;
820 cb(acc, nullptr, pd);
823 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
825 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
827 std::placeholders::_1,
828 std::placeholders::_2),
833 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EofErrorInContext)
835 expectContextError(REDIS_ERR_EOF);
836 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
838 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
840 std::placeholders::_1,
841 std::placeholders::_2),
846 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorInContext)
848 expectContextError(REDIS_ERR_PROTOCOL);
849 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
851 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
853 std::placeholders::_1,
854 std::placeholders::_2),
859 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, OomErrorInContext)
861 expectContextError(REDIS_ERR_OOM);
862 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
864 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
866 std::placeholders::_1,
867 std::placeholders::_2),
872 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorNotConnectedInContext)
874 expectContextError(CLUSTER_ERROR_NOT_CONNECTED);
875 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
877 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
879 std::placeholders::_1,
880 std::placeholders::_2),
885 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorConnectionLostInContext)
887 expectContextError(CLUSTER_ERROR_CONNECTION_LOST);
888 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
890 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
892 std::placeholders::_1,
893 std::placeholders::_2),
898 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
900 expectContextError(REDIS_ERR_OTHER);
901 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
903 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
905 std::placeholders::_1,
906 std::placeholders::_2),
911 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
914 expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
915 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
917 std::placeholders::_1,
918 std::placeholders::_2),
922 savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
923 dispatcher->disableCommandCallbacks();
924 expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
925 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
927 std::placeholders::_1,
928 std::placeholders::_2),
931 expectAckNotCalled();
932 savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
935 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DisconnectCallbackDetachesContextFromAdapter)
938 expectAdapterDetach();
939 disconnected(&acc, &ac, 0);
942 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
945 dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::disconnectCallback,
947 expectAdapterDetach();
948 expectDisconnectCallback();
949 disconnected(&acc, &ac, 0);
952 TEST_F(AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
955 redisClusterCallbackFn* savedCb;
957 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
959 .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
960 const char*, int, int, const char**, const size_t*)
966 Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
967 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
969 std::placeholders::_1,
970 std::placeholders::_2),
973 EXPECT_CALL(*this, ack(std::error_code(), _))
976 rr.type = REDIS_REPLY_NIL;
977 savedCb(&acc, &rr, savedPd);
978 savedCb(&acc, &rr, savedPd);
979 savedCb(&acc, &rr, savedPd);
982 TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
985 redisClusterCallbackFn* savedCb;
987 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
989 .WillOnce(Invoke([this, &savedCb, &savedPd](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd,
990 const char*, int, int, const char**, const size_t*)
994 cb(acc, &redisReplyBuilder.buildNilReply(), pd);
998 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
1000 std::placeholders::_1,
1001 std::placeholders::_2),
1004 EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
1007 TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, TooManyRepliesAborts)
1010 redisClusterCallbackFn* savedCb;
1012 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
1014 .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
1015 const char*, int, int, const char**, const size_t*)
1021 Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
1023 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
1025 std::placeholders::_1,
1026 std::placeholders::_2),
1029 savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd);
1030 EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");