Add Redis Sentinel based database discovery
[ric-plt/sdl.git] / tst / asynchiredisclustercommanddispatcher_test.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include <type_traits>
18 #include <memory>
19 #include <cstring>
20 #include <sys/epoll.h>
21 #include <sys/timerfd.h>
22 #include <sys/eventfd.h>
23 #include <arpa/inet.h>
24 #include <gtest/gtest.h>
25 #include <async.h>
26 #include "private/createlogger.hpp"
27 #include "private/error.hpp"
28 #include "private/logger.hpp"
29 #include "private/redis/asynchiredisclustercommanddispatcher.hpp"
30 #include "private/redis/redisgeneral.hpp"
31 #include "private/redis/reply.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/tst/hiredisclustersystemmock.hpp"
34 #include "private/timer.hpp"
35 #include "private/tst/contentsbuildermock.hpp"
36 #include "private/tst/enginemock.hpp"
37 #include "private/tst/hiredisclusterepolladaptermock.hpp"
38 #include "private/tst/redisreplybuilder.hpp"
39
40 using namespace shareddatalayer;
41 using namespace shareddatalayer::redis;
42 using namespace shareddatalayer::tst;
43 using namespace testing;
44
45 namespace
46 {
47     class AsyncHiredisClusterCommandDispatcherBaseTest: public testing::Test
48     {
49     public:
50         std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
51         StrictMock<EngineMock> engineMock;
52         HiredisClusterSystemMock hiredisClusterSystemMock;
53         std::shared_ptr<HiredisClusterEpollAdapterMock> adapterMock;
54         redisClusterAsyncContext acc;
55         redisAsyncContext ac;
56         int hiredisFd;
57         std::unique_ptr<AsyncHiredisClusterCommandDispatcher> dispatcher;
58         void (*connected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
59         void (*disconnected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
60         Timer::Callback savedConnectionRetryTimerCallback;
61         Timer::Duration expectedRetryTimerDuration;
62         Contents contents;
63         Contents clusterConnectionSetupContents;
64         RedisReplyBuilder redisReplyBuilder;
65         const AsyncConnection::Namespace defaultNamespace;
66         std::shared_ptr<Logger> logger;
67
68         AsyncHiredisClusterCommandDispatcherBaseTest():
69             contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
70             adapterMock(std::make_shared<HiredisClusterEpollAdapterMock>(engineMock, hiredisClusterSystemMock)),
71             acc { },
72             ac { },
73             hiredisFd(3),
74             connected(nullptr),
75             disconnected(nullptr),
76             expectedRetryTimerDuration(std::chrono::seconds(1)),
77             contents { { "CMD", "key1", "value1", "key2", "value2" },
78                        { 3, 4, 6, 4, 6 } },
79             redisReplyBuilder { },
80             defaultNamespace("namespace"),
81             logger(createLogger(SDL_LOG_PREFIX))
82         {
83         }
84
85         virtual ~AsyncHiredisClusterCommandDispatcherBaseTest()
86         {
87         }
88
89         MOCK_METHOD0(connectAck, void());
90
91         MOCK_METHOD0(disconnectCallback, void());
92
93         MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
94
95         void expectationsUntilConnect()
96         {
97             expectationsUntilConnect(acc);
98         }
99
100         void expectationsUntilConnect(redisClusterAsyncContext& acc)
101         {
102             expectRedisClusterAsyncConnect(acc);
103         }
104
105         void expectRedisClusterAsyncConnect()
106         {
107             expectRedisClusterAsyncConnect(acc);
108         }
109
110         void expectRedisClusterAsyncConnect(redisClusterAsyncContext& acc)
111         {
112             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
113                                                                            HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
114                 .Times(1)
115                 .WillOnce(InvokeWithoutArgs([this, &acc]()
116                                             {
117                                                 return &acc;
118                                             }));
119         }
120
121         void expectRedisClusterAsyncConnectReturnNullptr()
122         {
123             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
124                                                                            HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
125                 .Times(1)
126                 .WillOnce(InvokeWithoutArgs([this]()
127                                             {
128                                                 return nullptr;
129                                             }));
130         }
131
132         void expectRedisClusterAsyncSetConnectCallback()
133         {
134             expectRedisClusterAsyncSetConnectCallback(acc);
135         }
136
137         void expectRedisClusterAsyncSetConnectCallback(redisClusterAsyncContext& acc)
138         {
139             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetConnectCallback(&acc, _))
140                 .Times(1)
141                 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceConnectCallback* cb)
142                                  {
143                                      connected = cb;
144                                      return REDIS_OK;
145                                  }));
146         }
147
148         void expectRedisClusterAsyncSetDisconnectCallback()
149         {
150             expectRedisClusterAsyncSetDisconnectCallback(acc);
151         }
152
153         void expectRedisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext& acc)
154         {
155             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetDisconnectCallback(&acc, _))
156                 .Times(1)
157                 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceDisconnectCallback* cb)
158                                  {
159                                      disconnected = cb;
160                                      return REDIS_OK;
161                                  }));
162         }
163
164         void expectCommandListQuery()
165         {
166             expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
167         }
168
169         void expectCommandListQueryReturnError()
170         {
171             expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
172         }
173
174         void expectAdapterSetup()
175         {
176             expectAdapterSetup(acc);
177         }
178
179         void expectAdapterSetup(redisClusterAsyncContext& acc)
180         {
181             EXPECT_CALL(*adapterMock, setup(&acc))
182                 .Times(1);
183         }
184
185         void expectAdapterDetach()
186         {
187             EXPECT_CALL(*adapterMock, detach(&ac))
188                 .Times(1);
189         }
190
191         void expectConnectAck()
192         {
193             EXPECT_CALL(*this, connectAck())
194                 .Times(1);
195         }
196
197         void expectDisconnectCallback()
198         {
199             EXPECT_CALL(*this, disconnectCallback())
200                 .Times(1);
201         }
202
203         void expectRedisClusterAsyncFree()
204         {
205             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
206                 .Times(1);
207         }
208
209         void expectRedisClusterAsyncDisconnect()
210         {
211             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncDisconnect(&acc))
212                 .Times(1);
213         }
214
215         void verifyAckErrorReply(const Reply& reply)
216         {
217             EXPECT_EQ(Reply::Type::NIL, reply.getType());
218             EXPECT_EQ(0, reply.getInteger());
219             EXPECT_TRUE(reply.getString()->str.empty());
220             EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
221             EXPECT_TRUE(reply.getArray()->empty());
222         }
223
224         void expectAckError()
225         {
226             EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
227                 .Times(1)
228                 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
229                                  {
230                                      verifyAckErrorReply(reply);
231                                  }));
232         }
233
234         void expectAckError(const std::error_code& ec)
235         {
236             EXPECT_CALL(*this, ack(ec, _))
237                 .Times(1)
238                 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
239                                  {
240                                      verifyAckErrorReply(reply);
241                                  }));
242         }
243
244         void expectArmConnectionRetryTimer()
245         {
246             EXPECT_CALL(engineMock, armTimer(_, expectedRetryTimerDuration, _))
247                 .Times(1)
248                 .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
249         }
250
251         void expectDisarmConnectionRetryTimer()
252         {
253             EXPECT_CALL(engineMock, disarmTimer(_))
254                 .Times(1);
255         }
256
257         void expectRedisClusterAsyncCommandArgv(redisReply& rr)
258         {
259             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
260                 .Times(1)
261                 .WillOnce(Invoke([&rr](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
262                                        int, const char**, const size_t*)
263                                  {
264                                      cb(acc, &rr, pd);
265                                      return REDIS_OK;
266                                  }));
267         }
268
269         void expectAck()
270         {
271             EXPECT_CALL(*this, ack(std::error_code(), _))
272                 .Times(1);
273         }
274
275         void expectReplyError(const std::string& msg)
276         {
277             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
278                 .Times(1)
279                 .WillOnce(Invoke([this, msg](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*,
280                                     int, int, const char**, const size_t*)
281                                  {
282                                      cb(acc, &redisReplyBuilder.buildErrorReply(msg), pd);
283                                      return REDIS_OK;
284                                  }));
285         }
286
287         void expectContextError(int code)
288         {
289             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
290                 .Times(1)
291                 .WillOnce(Invoke([code](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
292                                         int, const char**, const size_t*)
293                                  {
294                                      acc->err = code;
295                                      cb(acc, nullptr, pd);
296                                      return REDIS_OK;
297                                  }));
298         }
299
300         void expectRedisClusterAsyncFreeCallPendingCallback(redisClusterCallbackFn* cb, void* pd)
301         {
302             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
303                 .Times(1)
304                 .WillOnce(Invoke([this, cb, pd](redisClusterAsyncContext* acc)
305                                  {
306                                      cb(acc, &redisReplyBuilder.buildNilReply(), pd);
307                                  }));
308         }
309
310         void expectAckNotCalled()
311         {
312             EXPECT_CALL(*this, ack(_,_))
313                 .Times(0);
314         }
315
316         void expectionsForSuccessfullConnectionSetup()
317         {
318             expectationsUntilConnect();
319             expectAdapterSetup();
320             expectRedisClusterAsyncSetConnectCallback();
321             expectRedisClusterAsyncSetDisconnectCallback();
322             expectCommandListQuery();
323         }
324
325         void callConnectionRetryTimerCallback()
326         {
327             ASSERT_NE(savedConnectionRetryTimerCallback, nullptr);
328             savedConnectionRetryTimerCallback();
329         }
330     };
331
332     class AsyncHiredisClusterCommandDispatcherDisconnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
333     {
334     public:
335         AsyncHiredisClusterCommandDispatcherDisconnectedTest()
336         {
337             InSequence dummy;
338             expectationsUntilConnect();
339             expectAdapterSetup();
340             expectRedisClusterAsyncSetConnectCallback();
341             expectRedisClusterAsyncSetDisconnectCallback();
342             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
343                 .Times(1);
344             dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
345                                                                       defaultNamespace,
346                                                                       { { "addr1", 111 }, { "addr2", 222 } },
347                                                                       contentsBuilderMock,
348                                                                       false,
349                                                                       hiredisClusterSystemMock,
350                                                                       adapterMock,
351                                                                       logger));
352         }
353
354         ~AsyncHiredisClusterCommandDispatcherDisconnectedTest()
355         {
356         }
357     };
358
359     class AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisClusterCommandDispatcherBaseTest
360     {
361     public:
362         AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
363         {
364             InSequence dummy;
365             expectionsForSuccessfullConnectionSetup();
366             dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
367                                                                       defaultNamespace,
368                                                                       { { "addr1", 111 }, { "addr2", 222 } },
369                                                                       contentsBuilderMock,
370                                                                       true,
371                                                                       hiredisClusterSystemMock,
372                                                                       adapterMock,
373                                                                       logger));
374         }
375
376         ~AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
377         {
378             expectRedisClusterAsyncFree();
379         }
380     };
381
382     class AsyncHiredisClusterCommandDispatcherConnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
383     {
384     public:
385         redisClusterCallbackFn* savedCb;
386         void* savedPd;
387
388         AsyncHiredisClusterCommandDispatcherConnectedTest():
389             savedCb(nullptr),
390             savedPd(nullptr)
391         {
392             InSequence dummy;
393             expectionsForSuccessfullConnectionSetup();
394             dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
395                                                                       defaultNamespace,
396                                                                       { { "addr1", 111 }, { "addr2", 222 } },
397                                                                       contentsBuilderMock,
398                                                                       false,
399                                                                       hiredisClusterSystemMock,
400                                                                       adapterMock,
401                                                                       logger));
402             connected(&acc, &ac, 0);
403         }
404
405         ~AsyncHiredisClusterCommandDispatcherConnectedTest()
406         {
407             expectRedisClusterAsyncFree();
408         }
409
410         void expectRedisClusterAsyncCommandArgvWithKey_SaveCb()
411         {
412             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
413                 .Times(1)
414                 .WillOnce(Invoke([this](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
415                                         const char*, int, int, const char**, const size_t*)
416                                  {
417                                      savedCb = cb;
418                                      savedPd = pd;
419                                      return REDIS_OK;
420                                  }));
421         }
422     };
423
424     using AsyncHiredisClusterCommandDispatcherDeathTest = AsyncHiredisClusterCommandDispatcherConnectedTest;
425 }
426
427 TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, IsNotCopyable)
428 {
429     EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisClusterCommandDispatcher>::value);
430     EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisClusterCommandDispatcher>::value);
431 }
432
433 TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
434 {
435     EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisClusterCommandDispatcher>::value));
436 }
437
438 TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
439 {
440     Engine::Callback storedCallback;
441     EXPECT_CALL(engineMock, postCallback(_))
442         .Times(1)
443         .WillOnce(SaveArg<0>(&storedCallback));
444     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::ack,
445                                         this,
446                                         std::placeholders::_1,
447                                         std::placeholders::_2),
448                               defaultNamespace,
449                               { });
450     expectAckError(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED);
451     storedCallback();
452 }
453
454 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ContextErrorInConnectArmsRetryTimer)
455 {
456     InSequence dummy;
457     acc.err = 123;
458     expectationsUntilConnect();
459     expectArmConnectionRetryTimer();
460     expectDisarmConnectionRetryTimer();
461
462     dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
463                                                               defaultNamespace,
464                                                               { { "addr1", 111 }, { "addr2", 222 } },
465                                                               contentsBuilderMock,
466                                                               false,
467                                                               hiredisClusterSystemMock,
468                                                               adapterMock,
469                                                               logger));
470 }
471
472 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
473 {
474     InSequence dummy;
475     expectRedisClusterAsyncConnectReturnNullptr();
476     expectArmConnectionRetryTimer();
477     expectDisarmConnectionRetryTimer();
478
479     dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
480                                                               defaultNamespace,
481                                                               { { "addr1", 111 }, { "addr2", 222 } },
482                                                               contentsBuilderMock,
483                                                               false,
484                                                               hiredisClusterSystemMock,
485                                                               adapterMock,
486                                                               logger));
487 }
488
489 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, FailedCommandListQueryArmsRetryTimer)
490 {
491     InSequence dummy;
492     Engine::Callback storedCallback;
493     expectationsUntilConnect();
494     expectAdapterSetup();
495     expectRedisClusterAsyncSetConnectCallback();
496     expectRedisClusterAsyncSetDisconnectCallback();
497     expectCommandListQueryReturnError();
498     expectArmConnectionRetryTimer();
499
500     dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
501                                                               defaultNamespace,
502                                                               { { "addr1", 111 }, { "addr2", 222 } },
503                                                               contentsBuilderMock,
504                                                               false,
505                                                               hiredisClusterSystemMock,
506                                                               adapterMock,
507                                                               logger));
508
509     expectDisarmConnectionRetryTimer();
510 }
511
512 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ConnectionSucceedsWithRetryTimer)
513 {
514     InSequence dummy;
515     expectRedisClusterAsyncConnectReturnNullptr();
516     expectArmConnectionRetryTimer();
517
518     dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
519                                                               defaultNamespace,
520                                                               { { "addr1", 111 }, { "addr2", 222 } },
521                                                               contentsBuilderMock,
522                                                               false,
523                                                               hiredisClusterSystemMock,
524                                                               adapterMock,
525                                                               logger));
526
527     expectionsForSuccessfullConnectionSetup();
528     expectRedisClusterAsyncFree();
529
530     callConnectionRetryTimerCallback();
531 }
532
533 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ConnectAckCalledIfConnected)
534 {
535     Engine::Callback storedCallback;
536     EXPECT_CALL(engineMock, postCallback(_))
537         .Times(1)
538         .WillOnce(SaveArg<0>(&storedCallback));
539     dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::connectAck,
540                                              this));
541     expectConnectAck();
542     storedCallback();
543 }
544
545 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanDispatchCommands)
546 {
547     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
548         .Times(1)
549         .WillOnce(Invoke([this](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char *key,
550                                 int keylen, int argc, const char** argv, const size_t* argvlen)
551                          {
552                              EXPECT_STREQ(defaultNamespace.c_str(), key);
553                              EXPECT_EQ(9, keylen);
554                              EXPECT_EQ((int)contents.stack.size(), argc);
555                              EXPECT_EQ(contents.sizes[0], argvlen[0]);
556                              EXPECT_EQ(contents.sizes[1], argvlen[1]);
557                              EXPECT_EQ(contents.sizes[2], argvlen[2]);
558                              EXPECT_EQ(contents.sizes[3], argvlen[3]);
559                              EXPECT_EQ(contents.sizes[4], argvlen[4]);
560                              EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
561                              EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
562                              EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
563                              EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
564                              EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
565                              cb(acc, &redisReplyBuilder.buildNilReply(), pd);
566                              return REDIS_OK;
567                          }));
568     expectAck();
569     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
570                                         this,
571                                         std::placeholders::_1,
572                                         std::placeholders::_2),
573                               defaultNamespace,
574                               contents);
575 }
576
577 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseNilReply)
578 {
579     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildNilReply());
580     EXPECT_CALL(*this, ack(std::error_code(), _))
581         .Times(1)
582         .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
583                          {
584                              EXPECT_EQ(Reply::Type::NIL, reply.getType());
585                              EXPECT_EQ(0, reply.getInteger());
586                              EXPECT_TRUE(reply.getString()->str.empty());
587                              EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
588                              EXPECT_TRUE(reply.getArray()->empty());
589                          }));
590     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
591                                         this,
592                                         std::placeholders::_1,
593                                         std::placeholders::_2),
594                               defaultNamespace,
595                               contents);
596 }
597
598 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseIntegerReply)
599 {
600     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
601     EXPECT_CALL(*this, ack(std::error_code(), _))
602         .Times(1)
603         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
604                          {
605                              auto expected(redisReplyBuilder.buildIntegerReply());
606                              EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
607                              EXPECT_EQ(expected.integer, reply.getInteger());
608                          }));
609     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
610                                         this,
611                                         std::placeholders::_1,
612                                         std::placeholders::_2),
613                               defaultNamespace,
614                               contents);
615 }
616
617 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStatusReply)
618 {
619     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
620     EXPECT_CALL(*this, ack(std::error_code(), _))
621         .Times(1)
622         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
623                          {
624                              auto expected(redisReplyBuilder.buildStatusReply());
625                              EXPECT_EQ(Reply::Type::STATUS, reply.getType());
626                              EXPECT_EQ(expected.len, reply.getString()->len);
627                              EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
628                          }));
629     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
630                                         this,
631                                         std::placeholders::_1,
632                                         std::placeholders::_2),
633                               defaultNamespace,
634                               contents);
635 }
636
637 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStringReply)
638 {
639     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStringReply());
640     EXPECT_CALL(*this, ack(std::error_code(), _))
641         .Times(1)
642         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
643                          {
644                              auto expected(redisReplyBuilder.buildStringReply());
645                              EXPECT_EQ(Reply::Type::STRING, reply.getType());
646                              EXPECT_EQ(expected.len, reply.getString()->len);
647                              EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
648                          }));
649     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
650                                         this,
651                                         std::placeholders::_1,
652                                         std::placeholders::_2),
653                               defaultNamespace,
654                               contents);
655 }
656
657 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseArrayReply)
658 {
659     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
660     EXPECT_CALL(*this, ack(std::error_code(), _))
661         .Times(1)
662         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
663                          {
664                              auto array(reply.getArray());
665                              EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
666                              EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
667                              EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
668                          }));
669     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
670                                         this,
671                                         std::placeholders::_1,
672                                         std::placeholders::_2),
673                               defaultNamespace,
674                               contents);
675 }
676
677 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
678 {
679     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
680         .Times(1)
681         .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn*, void*, const char*, int, int,
682                             const char**, const size_t*)
683                          {
684                              acc->err = REDIS_ERR;
685                              return REDIS_ERR;
686                          }));
687     Engine::Callback storedCallback;
688     EXPECT_CALL(engineMock, postCallback(_))
689         .Times(1)
690         .WillOnce(SaveArg<0>(&storedCallback));
691     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
692                                         this,
693                                         std::placeholders::_1,
694                                         std::placeholders::_2),
695                               defaultNamespace,
696                               contents);
697     expectAckError();
698     storedCallback();
699 }
700
701 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
702 {
703     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
704         .Times(1)
705         .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
706                             const char**, const size_t*)
707                          {
708                              cb(acc, nullptr, pd);
709                              return REDIS_OK;
710                          }));
711     expectAckError();
712     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
713                                         this,
714                                         std::placeholders::_1,
715                                         std::placeholders::_2),
716                               defaultNamespace,
717                               contents);
718 }
719
720 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
721 {
722     expectReplyError("LOADING Redis is loading the dataset in memory");
723     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
724         .Times(1);
725     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
726                                         this,
727                                         std::placeholders::_1,
728                                         std::placeholders::_2),
729                               defaultNamespace,
730                               contents);
731 }
732
733 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterDownIsRecognizedFromReply)
734 {
735     //SDL checks only that reply starts with CLUSTERDOWN string
736     expectReplyError("CLUSTERDOWN");
737     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
738         .Times(1);
739     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
740                                         this,
741                                         std::placeholders::_1,
742                                         std::placeholders::_2),
743                                         defaultNamespace,
744                                         contents);
745
746     expectReplyError("CLUSTERDOWN The cluster is down");
747     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
748         .Times(1);
749     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
750                                         this,
751                                         std::placeholders::_1,
752                                         std::placeholders::_2),
753                                         defaultNamespace,
754                                         contents);
755
756     expectReplyError("CLUSTERDOW");
757     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
758         .Times(1);
759     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
760                                         this,
761                                         std::placeholders::_1,
762                                         std::placeholders::_2),
763                                         defaultNamespace,
764                                         contents);
765 }
766
767 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
768 {
769     expectReplyError("ERR Protocol error: invalid bulk length");
770     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
771         .Times(1);
772     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
773                                         this,
774                                         std::placeholders::_1,
775                                         std::placeholders::_2),
776                                         defaultNamespace,
777                                         contents);
778 }
779
780 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
781 {
782     expectReplyError("something sinister");
783     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
784         .Times(1);
785     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
786                                         this,
787                                         std::placeholders::_1,
788                                         std::placeholders::_2),
789                                         defaultNamespace,
790                                         contents);
791 }
792
793 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EmptyReplyErrorIsConvertedToUnknownError)
794 {
795     expectReplyError("");
796     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
797         .Times(1);
798     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
799                                         this,
800                                         std::placeholders::_1,
801                                         std::placeholders::_2),
802                                         defaultNamespace,
803                                         contents);
804 }
805
806 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, IOErrorInContext)
807 {
808     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
809         .Times(1)
810         .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
811                             const char**, const size_t*)
812                          {
813                              acc->err = REDIS_ERR_IO;
814                              errno = EINVAL;
815                              cb(acc, nullptr, pd);
816                              return REDIS_OK;
817                          }));
818     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
819         .Times(1);
820     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
821                                         this,
822                                         std::placeholders::_1,
823                                         std::placeholders::_2),
824                                         defaultNamespace,
825                                         contents);
826 }
827
828 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EofErrorInContext)
829 {
830     expectContextError(REDIS_ERR_EOF);
831     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
832         .Times(1);
833     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
834                                         this,
835                                         std::placeholders::_1,
836                                         std::placeholders::_2),
837                                         defaultNamespace,
838                                         contents);
839 }
840
841 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorInContext)
842 {
843     expectContextError(REDIS_ERR_PROTOCOL);
844     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
845         .Times(1);
846     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
847                                         this,
848                                         std::placeholders::_1,
849                                         std::placeholders::_2),
850                                         defaultNamespace,
851                                         contents);
852 }
853
854 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, OomErrorInContext)
855 {
856     expectContextError(REDIS_ERR_OOM);
857     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
858         .Times(1);
859     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
860                                         this,
861                                         std::placeholders::_1,
862                                         std::placeholders::_2),
863                                         defaultNamespace,
864                                         contents);
865 }
866
867 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorNotConnectedInContext)
868 {
869     expectContextError(CLUSTER_ERROR_NOT_CONNECTED);
870     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
871         .Times(1);
872     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
873                                         this,
874                                         std::placeholders::_1,
875                                         std::placeholders::_2),
876                                         defaultNamespace,
877                                         contents);
878 }
879
880 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorConnectionLostInContext)
881 {
882     expectContextError(CLUSTER_ERROR_CONNECTION_LOST);
883     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
884         .Times(1);
885     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
886                                         this,
887                                         std::placeholders::_1,
888                                         std::placeholders::_2),
889                                         defaultNamespace,
890                                         contents);
891 }
892
893 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
894 {
895     expectContextError(REDIS_ERR_OTHER);
896     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
897         .Times(1);
898     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
899                                         this,
900                                         std::placeholders::_1,
901                                         std::placeholders::_2),
902                                         defaultNamespace,
903                                         contents);
904 }
905
906 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
907 {
908     InSequence dummy;
909     expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
910     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
911                                         this,
912                                         std::placeholders::_1,
913                                         std::placeholders::_2),
914                                         defaultNamespace,
915                                         contents);
916     expectAck();
917     savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
918     dispatcher->disableCommandCallbacks();
919     expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
920     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
921                                         this,
922                                         std::placeholders::_1,
923                                         std::placeholders::_2),
924                                         defaultNamespace,
925                                         contents);
926     expectAckNotCalled();
927     savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
928 }
929
930 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DisconnectCallbackDetachesContextFromAdapter)
931 {
932     InSequence dummy;
933     expectAdapterDetach();
934     disconnected(&acc, &ac, 0);
935 }
936
937 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
938 {
939     InSequence dummy;
940     dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::disconnectCallback,
941                                              this));
942     expectAdapterDetach();
943     expectDisconnectCallback();
944     disconnected(&acc, &ac, 0);
945 }
946
947 TEST_F(AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
948 {
949     InSequence dummy;
950     redisClusterCallbackFn* savedCb;
951     void* savedPd;
952     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
953         .Times(1)
954         .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
955                                               const char*, int, int, const char**, const size_t*)
956                          {
957                              savedCb = cb;
958                              savedPd = pd;
959                              return REDIS_OK;
960                          }));
961     Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
962     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
963                                         this,
964                                         std::placeholders::_1,
965                                         std::placeholders::_2),
966                               defaultNamespace,
967                               contents);
968     EXPECT_CALL(*this, ack(std::error_code(), _))
969             .Times(3);
970     redisReply rr;
971     rr.type = REDIS_REPLY_NIL;
972     savedCb(&acc, &rr, savedPd);
973     savedCb(&acc, &rr, savedPd);
974     savedCb(&acc, &rr, savedPd);
975 }
976
977 TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
978 {
979     InSequence dummy;
980     redisClusterCallbackFn* savedCb;
981     void* savedPd;
982     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
983         .Times(1)
984         .WillOnce(Invoke([this, &savedCb, &savedPd](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd,
985                                                     const char*, int, int, const char**, const size_t*)
986                          {
987                              savedCb = cb;
988                              savedPd = pd;
989                              cb(acc, &redisReplyBuilder.buildNilReply(), pd);
990                              return REDIS_OK;
991                          }));
992     expectAck();
993     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
994                                         this,
995                                         std::placeholders::_1,
996                                         std::placeholders::_2),
997                                         defaultNamespace,
998                                         contents);
999     EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
1000 }
1001
1002 TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, TooManyRepliesAborts)
1003 {
1004     InSequence dummy;
1005     redisClusterCallbackFn* savedCb;
1006     void* savedPd;
1007     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
1008         .Times(1)
1009         .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
1010                                               const char*, int, int, const char**, const size_t*)
1011                          {
1012                              savedCb = cb;
1013                              savedPd = pd;
1014                              return REDIS_OK;
1015                          }));
1016     Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
1017     expectAck();
1018     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
1019                                         this,
1020                                         std::placeholders::_1,
1021                                         std::placeholders::_2),
1022                                         defaultNamespace,
1023                                         contents);
1024     savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd);
1025     EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
1026 }