RIC:1060: Change in PTL
[ric-plt/sdl.git] / tst / asynchiredisclustercommanddispatcher_test.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include <type_traits>
23 #include <memory>
24 #include <cstring>
25 #include <sys/epoll.h>
26 #include <sys/timerfd.h>
27 #include <sys/eventfd.h>
28 #include <arpa/inet.h>
29 #include <gtest/gtest.h>
30 #include <async.h>
31 #include "private/createlogger.hpp"
32 #include "private/error.hpp"
33 #include "private/logger.hpp"
34 #include "private/redis/asynchiredisclustercommanddispatcher.hpp"
35 #include "private/redis/redisgeneral.hpp"
36 #include "private/redis/reply.hpp"
37 #include "private/redis/contents.hpp"
38 #include "private/tst/hiredisclustersystemmock.hpp"
39 #include "private/timer.hpp"
40 #include "private/tst/contentsbuildermock.hpp"
41 #include "private/tst/enginemock.hpp"
42 #include "private/tst/hiredisclusterepolladaptermock.hpp"
43 #include "private/tst/redisreplybuilder.hpp"
44
45 using namespace shareddatalayer;
46 using namespace shareddatalayer::redis;
47 using namespace shareddatalayer::tst;
48 using namespace testing;
49
50 namespace
51 {
52     class AsyncHiredisClusterCommandDispatcherBaseTest: public testing::Test
53     {
54     public:
55         std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
56         StrictMock<EngineMock> engineMock;
57         HiredisClusterSystemMock hiredisClusterSystemMock;
58         std::shared_ptr<HiredisClusterEpollAdapterMock> adapterMock;
59         redisClusterAsyncContext acc;
60         redisAsyncContext ac;
61         int hiredisFd;
62         std::unique_ptr<AsyncHiredisClusterCommandDispatcher> dispatcher;
63         void (*connected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
64         void (*disconnected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
65         Timer::Callback savedConnectionRetryTimerCallback;
66         Timer::Duration expectedRetryTimerDuration;
67         Contents contents;
68         Contents clusterConnectionSetupContents;
69         RedisReplyBuilder redisReplyBuilder;
70         const AsyncConnection::Namespace defaultNamespace;
71         std::shared_ptr<Logger> logger;
72
73         AsyncHiredisClusterCommandDispatcherBaseTest():
74             contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
75             adapterMock(std::make_shared<HiredisClusterEpollAdapterMock>(engineMock, hiredisClusterSystemMock)),
76             acc { },
77             ac { },
78             hiredisFd(3),
79             connected(nullptr),
80             disconnected(nullptr),
81             expectedRetryTimerDuration(std::chrono::seconds(1)),
82             contents { { "CMD", "key1", "value1", "key2", "value2" },
83                        { 3, 4, 6, 4, 6 } },
84             redisReplyBuilder { },
85             defaultNamespace("namespace"),
86             logger(createLogger(SDL_LOG_PREFIX))
87         {
88         }
89
90         virtual ~AsyncHiredisClusterCommandDispatcherBaseTest()
91         {
92         }
93
94         MOCK_METHOD0(connectAck, void());
95
96         MOCK_METHOD0(disconnectCallback, void());
97
98         MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
99
100         void expectationsUntilConnect()
101         {
102             expectationsUntilConnect(acc);
103         }
104
105         void expectationsUntilConnect(redisClusterAsyncContext& acc)
106         {
107             expectRedisClusterAsyncConnect(acc);
108         }
109
110         void expectRedisClusterAsyncConnect()
111         {
112             expectRedisClusterAsyncConnect(acc);
113         }
114
115         void expectRedisClusterAsyncConnect(redisClusterAsyncContext& acc)
116         {
117             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
118                                                                            HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
119                 .Times(1)
120                 .WillOnce(InvokeWithoutArgs([this, &acc]()
121                                             {
122                                                 return &acc;
123                                             }));
124         }
125
126         void expectRedisClusterAsyncConnectReturnNullptr()
127         {
128             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
129                                                                            HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
130                 .Times(1)
131                 .WillOnce(InvokeWithoutArgs([this]()
132                                             {
133                                                 return nullptr;
134                                             }));
135         }
136
137         void expectRedisClusterAsyncSetConnectCallback()
138         {
139             expectRedisClusterAsyncSetConnectCallback(acc);
140         }
141
142         void expectRedisClusterAsyncSetConnectCallback(redisClusterAsyncContext& acc)
143         {
144             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetConnectCallback(&acc, _))
145                 .Times(1)
146                 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceConnectCallback* cb)
147                                  {
148                                      connected = cb;
149                                      return REDIS_OK;
150                                  }));
151         }
152
153         void expectRedisClusterAsyncSetDisconnectCallback()
154         {
155             expectRedisClusterAsyncSetDisconnectCallback(acc);
156         }
157
158         void expectRedisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext& acc)
159         {
160             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetDisconnectCallback(&acc, _))
161                 .Times(1)
162                 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceDisconnectCallback* cb)
163                                  {
164                                      disconnected = cb;
165                                      return REDIS_OK;
166                                  }));
167         }
168
169         void expectCommandListQuery()
170         {
171             expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
172         }
173
174         void expectCommandListQueryReturnError()
175         {
176             expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
177         }
178
179         void expectAdapterSetup()
180         {
181             expectAdapterSetup(acc);
182         }
183
184         void expectAdapterSetup(redisClusterAsyncContext& acc)
185         {
186             EXPECT_CALL(*adapterMock, setup(&acc))
187                 .Times(1);
188         }
189
190         void expectAdapterDetach()
191         {
192             EXPECT_CALL(*adapterMock, detach(&ac))
193                 .Times(1);
194         }
195
196         void expectConnectAck()
197         {
198             EXPECT_CALL(*this, connectAck())
199                 .Times(1);
200         }
201
202         void expectDisconnectCallback()
203         {
204             EXPECT_CALL(*this, disconnectCallback())
205                 .Times(1);
206         }
207
208         void expectRedisClusterAsyncFree()
209         {
210             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
211                 .Times(1);
212         }
213
214         void expectRedisClusterAsyncDisconnect()
215         {
216             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncDisconnect(&acc))
217                 .Times(1);
218         }
219
220         void verifyAckErrorReply(const Reply& reply)
221         {
222             EXPECT_EQ(Reply::Type::NIL, reply.getType());
223             EXPECT_EQ(0, reply.getInteger());
224             EXPECT_TRUE(reply.getString()->str.empty());
225             EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
226             EXPECT_TRUE(reply.getArray()->empty());
227         }
228
229         void expectAckError()
230         {
231             EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
232                 .Times(1)
233                 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
234                                  {
235                                      verifyAckErrorReply(reply);
236                                  }));
237         }
238
239         void expectAckError(const std::error_code& ec)
240         {
241             EXPECT_CALL(*this, ack(ec, _))
242                 .Times(1)
243                 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
244                                  {
245                                      verifyAckErrorReply(reply);
246                                  }));
247         }
248
249         void expectArmConnectionRetryTimer()
250         {
251             EXPECT_CALL(engineMock, armTimer(_, expectedRetryTimerDuration, _))
252                 .Times(1)
253                 .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
254         }
255
256         void expectDisarmConnectionRetryTimer()
257         {
258             EXPECT_CALL(engineMock, disarmTimer(_))
259                 .Times(1);
260         }
261
262         void expectRedisClusterAsyncCommandArgv(redisReply& rr)
263         {
264             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
265                 .Times(1)
266                 .WillOnce(Invoke([&rr](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
267                                        int, const char**, const size_t*)
268                                  {
269                                      cb(acc, &rr, pd);
270                                      return REDIS_OK;
271                                  }));
272         }
273
274         void expectAck()
275         {
276             EXPECT_CALL(*this, ack(std::error_code(), _))
277                 .Times(1);
278         }
279
280         void expectReplyError(const std::string& msg)
281         {
282             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
283                 .Times(1)
284                 .WillOnce(Invoke([this, msg](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*,
285                                     int, int, const char**, const size_t*)
286                                  {
287                                      cb(acc, &redisReplyBuilder.buildErrorReply(msg), pd);
288                                      return REDIS_OK;
289                                  }));
290         }
291
292         void expectContextError(int code)
293         {
294             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
295                 .Times(1)
296                 .WillOnce(Invoke([code](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
297                                         int, const char**, const size_t*)
298                                  {
299                                      acc->err = code;
300                                      cb(acc, nullptr, pd);
301                                      return REDIS_OK;
302                                  }));
303         }
304
305         void expectRedisClusterAsyncFreeCallPendingCallback(redisClusterCallbackFn* cb, void* pd)
306         {
307             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
308                 .Times(1)
309                 .WillOnce(Invoke([this, cb, pd](redisClusterAsyncContext* acc)
310                                  {
311                                      cb(acc, &redisReplyBuilder.buildNilReply(), pd);
312                                  }));
313         }
314
315         void expectAckNotCalled()
316         {
317             EXPECT_CALL(*this, ack(_,_))
318                 .Times(0);
319         }
320
321         void expectionsForSuccessfullConnectionSetup()
322         {
323             expectationsUntilConnect();
324             expectAdapterSetup();
325             expectRedisClusterAsyncSetConnectCallback();
326             expectRedisClusterAsyncSetDisconnectCallback();
327             expectCommandListQuery();
328         }
329
330         void callConnectionRetryTimerCallback()
331         {
332             ASSERT_NE(savedConnectionRetryTimerCallback, nullptr);
333             savedConnectionRetryTimerCallback();
334         }
335     };
336
337     class AsyncHiredisClusterCommandDispatcherDisconnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
338     {
339     public:
340         AsyncHiredisClusterCommandDispatcherDisconnectedTest()
341         {
342             InSequence dummy;
343             expectationsUntilConnect();
344             expectAdapterSetup();
345             expectRedisClusterAsyncSetConnectCallback();
346             expectRedisClusterAsyncSetDisconnectCallback();
347             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
348                 .Times(1);
349             dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
350                                                                       defaultNamespace,
351                                                                       { { "addr1", 111 }, { "addr2", 222 } },
352                                                                       contentsBuilderMock,
353                                                                       false,
354                                                                       hiredisClusterSystemMock,
355                                                                       adapterMock,
356                                                                       logger));
357         }
358
359         ~AsyncHiredisClusterCommandDispatcherDisconnectedTest()
360         {
361         }
362     };
363
364     class AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisClusterCommandDispatcherBaseTest
365     {
366     public:
367         AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
368         {
369             InSequence dummy;
370             expectionsForSuccessfullConnectionSetup();
371             dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
372                                                                       defaultNamespace,
373                                                                       { { "addr1", 111 }, { "addr2", 222 } },
374                                                                       contentsBuilderMock,
375                                                                       true,
376                                                                       hiredisClusterSystemMock,
377                                                                       adapterMock,
378                                                                       logger));
379         }
380
381         ~AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
382         {
383             expectRedisClusterAsyncFree();
384         }
385     };
386
387     class AsyncHiredisClusterCommandDispatcherConnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
388     {
389     public:
390         redisClusterCallbackFn* savedCb;
391         void* savedPd;
392
393         AsyncHiredisClusterCommandDispatcherConnectedTest():
394             savedCb(nullptr),
395             savedPd(nullptr)
396         {
397             InSequence dummy;
398             expectionsForSuccessfullConnectionSetup();
399             dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
400                                                                       defaultNamespace,
401                                                                       { { "addr1", 111 }, { "addr2", 222 } },
402                                                                       contentsBuilderMock,
403                                                                       false,
404                                                                       hiredisClusterSystemMock,
405                                                                       adapterMock,
406                                                                       logger));
407             connected(&acc, &ac, 0);
408         }
409
410         ~AsyncHiredisClusterCommandDispatcherConnectedTest()
411         {
412             expectRedisClusterAsyncFree();
413         }
414
415         void expectRedisClusterAsyncCommandArgvWithKey_SaveCb()
416         {
417             EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
418                 .Times(1)
419                 .WillOnce(Invoke([this](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
420                                         const char*, int, int, const char**, const size_t*)
421                                  {
422                                      savedCb = cb;
423                                      savedPd = pd;
424                                      return REDIS_OK;
425                                  }));
426         }
427     };
428
429     using AsyncHiredisClusterCommandDispatcherDeathTest = AsyncHiredisClusterCommandDispatcherConnectedTest;
430 }
431
432 TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, IsNotCopyable)
433 {
434     EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisClusterCommandDispatcher>::value);
435     EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisClusterCommandDispatcher>::value);
436 }
437
438 TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
439 {
440     EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisClusterCommandDispatcher>::value));
441 }
442
443 TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
444 {
445     Engine::Callback storedCallback;
446     EXPECT_CALL(engineMock, postCallback(_))
447         .Times(1)
448         .WillOnce(SaveArg<0>(&storedCallback));
449     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::ack,
450                                         this,
451                                         std::placeholders::_1,
452                                         std::placeholders::_2),
453                               defaultNamespace,
454                               { });
455     expectAckError(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED);
456     storedCallback();
457 }
458
459 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ContextErrorInConnectArmsRetryTimer)
460 {
461     InSequence dummy;
462     acc.err = 123;
463     expectationsUntilConnect();
464     expectArmConnectionRetryTimer();
465     expectDisarmConnectionRetryTimer();
466
467     dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
468                                                               defaultNamespace,
469                                                               { { "addr1", 111 }, { "addr2", 222 } },
470                                                               contentsBuilderMock,
471                                                               false,
472                                                               hiredisClusterSystemMock,
473                                                               adapterMock,
474                                                               logger));
475 }
476
477 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
478 {
479     InSequence dummy;
480     expectRedisClusterAsyncConnectReturnNullptr();
481     expectArmConnectionRetryTimer();
482     expectDisarmConnectionRetryTimer();
483
484     dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
485                                                               defaultNamespace,
486                                                               { { "addr1", 111 }, { "addr2", 222 } },
487                                                               contentsBuilderMock,
488                                                               false,
489                                                               hiredisClusterSystemMock,
490                                                               adapterMock,
491                                                               logger));
492 }
493
494 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, FailedCommandListQueryArmsRetryTimer)
495 {
496     InSequence dummy;
497     Engine::Callback storedCallback;
498     expectationsUntilConnect();
499     expectAdapterSetup();
500     expectRedisClusterAsyncSetConnectCallback();
501     expectRedisClusterAsyncSetDisconnectCallback();
502     expectCommandListQueryReturnError();
503     expectArmConnectionRetryTimer();
504
505     dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
506                                                               defaultNamespace,
507                                                               { { "addr1", 111 }, { "addr2", 222 } },
508                                                               contentsBuilderMock,
509                                                               false,
510                                                               hiredisClusterSystemMock,
511                                                               adapterMock,
512                                                               logger));
513
514     expectDisarmConnectionRetryTimer();
515 }
516
517 TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ConnectionSucceedsWithRetryTimer)
518 {
519     InSequence dummy;
520     expectRedisClusterAsyncConnectReturnNullptr();
521     expectArmConnectionRetryTimer();
522
523     dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
524                                                               defaultNamespace,
525                                                               { { "addr1", 111 }, { "addr2", 222 } },
526                                                               contentsBuilderMock,
527                                                               false,
528                                                               hiredisClusterSystemMock,
529                                                               adapterMock,
530                                                               logger));
531
532     expectionsForSuccessfullConnectionSetup();
533     expectRedisClusterAsyncFree();
534
535     callConnectionRetryTimerCallback();
536 }
537
538 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ConnectAckCalledIfConnected)
539 {
540     Engine::Callback storedCallback;
541     EXPECT_CALL(engineMock, postCallback(_))
542         .Times(1)
543         .WillOnce(SaveArg<0>(&storedCallback));
544     dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::connectAck,
545                                              this));
546     expectConnectAck();
547     storedCallback();
548 }
549
550 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanDispatchCommands)
551 {
552     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
553         .Times(1)
554         .WillOnce(Invoke([this](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char *key,
555                                 int keylen, int argc, const char** argv, const size_t* argvlen)
556                          {
557                              EXPECT_STREQ(defaultNamespace.c_str(), key);
558                              EXPECT_EQ(9, keylen);
559                              EXPECT_EQ((int)contents.stack.size(), argc);
560                              EXPECT_EQ(contents.sizes[0], argvlen[0]);
561                              EXPECT_EQ(contents.sizes[1], argvlen[1]);
562                              EXPECT_EQ(contents.sizes[2], argvlen[2]);
563                              EXPECT_EQ(contents.sizes[3], argvlen[3]);
564                              EXPECT_EQ(contents.sizes[4], argvlen[4]);
565                              EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
566                              EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
567                              EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
568                              EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
569                              EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
570                              cb(acc, &redisReplyBuilder.buildNilReply(), pd);
571                              return REDIS_OK;
572                          }));
573     expectAck();
574     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
575                                         this,
576                                         std::placeholders::_1,
577                                         std::placeholders::_2),
578                               defaultNamespace,
579                               contents);
580 }
581
582 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseNilReply)
583 {
584     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildNilReply());
585     EXPECT_CALL(*this, ack(std::error_code(), _))
586         .Times(1)
587         .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
588                          {
589                              EXPECT_EQ(Reply::Type::NIL, reply.getType());
590                              EXPECT_EQ(0, reply.getInteger());
591                              EXPECT_TRUE(reply.getString()->str.empty());
592                              EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
593                              EXPECT_TRUE(reply.getArray()->empty());
594                          }));
595     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
596                                         this,
597                                         std::placeholders::_1,
598                                         std::placeholders::_2),
599                               defaultNamespace,
600                               contents);
601 }
602
603 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseIntegerReply)
604 {
605     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
606     EXPECT_CALL(*this, ack(std::error_code(), _))
607         .Times(1)
608         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
609                          {
610                              auto expected(redisReplyBuilder.buildIntegerReply());
611                              EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
612                              EXPECT_EQ(expected.integer, reply.getInteger());
613                          }));
614     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
615                                         this,
616                                         std::placeholders::_1,
617                                         std::placeholders::_2),
618                               defaultNamespace,
619                               contents);
620 }
621
622 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStatusReply)
623 {
624     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
625     EXPECT_CALL(*this, ack(std::error_code(), _))
626         .Times(1)
627         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
628                          {
629                              auto expected(redisReplyBuilder.buildStatusReply());
630                              EXPECT_EQ(Reply::Type::STATUS, reply.getType());
631                              EXPECT_EQ(expected.len, reply.getString()->len);
632                              EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
633                          }));
634     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
635                                         this,
636                                         std::placeholders::_1,
637                                         std::placeholders::_2),
638                               defaultNamespace,
639                               contents);
640 }
641
642 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStringReply)
643 {
644     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStringReply());
645     EXPECT_CALL(*this, ack(std::error_code(), _))
646         .Times(1)
647         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
648                          {
649                              auto expected(redisReplyBuilder.buildStringReply());
650                              EXPECT_EQ(Reply::Type::STRING, reply.getType());
651                              EXPECT_EQ(expected.len, reply.getString()->len);
652                              EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
653                          }));
654     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
655                                         this,
656                                         std::placeholders::_1,
657                                         std::placeholders::_2),
658                               defaultNamespace,
659                               contents);
660 }
661
662 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseArrayReply)
663 {
664     expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
665     EXPECT_CALL(*this, ack(std::error_code(), _))
666         .Times(1)
667         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
668                          {
669                              auto array(reply.getArray());
670                              EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
671                              EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
672                              EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
673                          }));
674     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
675                                         this,
676                                         std::placeholders::_1,
677                                         std::placeholders::_2),
678                               defaultNamespace,
679                               contents);
680 }
681
682 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
683 {
684     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
685         .Times(1)
686         .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn*, void*, const char*, int, int,
687                             const char**, const size_t*)
688                          {
689                              acc->err = REDIS_ERR;
690                              return REDIS_ERR;
691                          }));
692     Engine::Callback storedCallback;
693     EXPECT_CALL(engineMock, postCallback(_))
694         .Times(1)
695         .WillOnce(SaveArg<0>(&storedCallback));
696     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
697                                         this,
698                                         std::placeholders::_1,
699                                         std::placeholders::_2),
700                               defaultNamespace,
701                               contents);
702     expectAckError();
703     storedCallback();
704 }
705
706 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
707 {
708     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
709         .Times(1)
710         .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
711                             const char**, const size_t*)
712                          {
713                              cb(acc, nullptr, pd);
714                              return REDIS_OK;
715                          }));
716     expectAckError();
717     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
718                                         this,
719                                         std::placeholders::_1,
720                                         std::placeholders::_2),
721                               defaultNamespace,
722                               contents);
723 }
724
725 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
726 {
727     expectReplyError("LOADING Redis is loading the dataset in memory");
728     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
729         .Times(1);
730     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
731                                         this,
732                                         std::placeholders::_1,
733                                         std::placeholders::_2),
734                               defaultNamespace,
735                               contents);
736 }
737
738 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterDownIsRecognizedFromReply)
739 {
740     //SDL checks only that reply starts with CLUSTERDOWN string
741     expectReplyError("CLUSTERDOWN");
742     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
743         .Times(1);
744     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
745                                         this,
746                                         std::placeholders::_1,
747                                         std::placeholders::_2),
748                                         defaultNamespace,
749                                         contents);
750
751     expectReplyError("CLUSTERDOWN The cluster is down");
752     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
753         .Times(1);
754     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
755                                         this,
756                                         std::placeholders::_1,
757                                         std::placeholders::_2),
758                                         defaultNamespace,
759                                         contents);
760
761     expectReplyError("CLUSTERDOW");
762     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
763         .Times(1);
764     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
765                                         this,
766                                         std::placeholders::_1,
767                                         std::placeholders::_2),
768                                         defaultNamespace,
769                                         contents);
770 }
771
772 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
773 {
774     expectReplyError("ERR Protocol error: invalid bulk length");
775     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
776         .Times(1);
777     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
778                                         this,
779                                         std::placeholders::_1,
780                                         std::placeholders::_2),
781                                         defaultNamespace,
782                                         contents);
783 }
784
785 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
786 {
787     expectReplyError("something sinister");
788     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
789         .Times(1);
790     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
791                                         this,
792                                         std::placeholders::_1,
793                                         std::placeholders::_2),
794                                         defaultNamespace,
795                                         contents);
796 }
797
798 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EmptyReplyErrorIsConvertedToUnknownError)
799 {
800     expectReplyError("");
801     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
802         .Times(1);
803     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
804                                         this,
805                                         std::placeholders::_1,
806                                         std::placeholders::_2),
807                                         defaultNamespace,
808                                         contents);
809 }
810
811 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, IOErrorInContext)
812 {
813     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
814         .Times(1)
815         .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
816                             const char**, const size_t*)
817                          {
818                              acc->err = REDIS_ERR_IO;
819                              errno = EINVAL;
820                              cb(acc, nullptr, pd);
821                              return REDIS_OK;
822                          }));
823     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
824         .Times(1);
825     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
826                                         this,
827                                         std::placeholders::_1,
828                                         std::placeholders::_2),
829                                         defaultNamespace,
830                                         contents);
831 }
832
833 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EofErrorInContext)
834 {
835     expectContextError(REDIS_ERR_EOF);
836     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
837         .Times(1);
838     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
839                                         this,
840                                         std::placeholders::_1,
841                                         std::placeholders::_2),
842                                         defaultNamespace,
843                                         contents);
844 }
845
846 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorInContext)
847 {
848     expectContextError(REDIS_ERR_PROTOCOL);
849     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
850         .Times(1);
851     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
852                                         this,
853                                         std::placeholders::_1,
854                                         std::placeholders::_2),
855                                         defaultNamespace,
856                                         contents);
857 }
858
859 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, OomErrorInContext)
860 {
861     expectContextError(REDIS_ERR_OOM);
862     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
863         .Times(1);
864     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
865                                         this,
866                                         std::placeholders::_1,
867                                         std::placeholders::_2),
868                                         defaultNamespace,
869                                         contents);
870 }
871
872 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorNotConnectedInContext)
873 {
874     expectContextError(CLUSTER_ERROR_NOT_CONNECTED);
875     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
876         .Times(1);
877     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
878                                         this,
879                                         std::placeholders::_1,
880                                         std::placeholders::_2),
881                                         defaultNamespace,
882                                         contents);
883 }
884
885 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorConnectionLostInContext)
886 {
887     expectContextError(CLUSTER_ERROR_CONNECTION_LOST);
888     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
889         .Times(1);
890     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
891                                         this,
892                                         std::placeholders::_1,
893                                         std::placeholders::_2),
894                                         defaultNamespace,
895                                         contents);
896 }
897
898 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
899 {
900     expectContextError(REDIS_ERR_OTHER);
901     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
902         .Times(1);
903     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
904                                         this,
905                                         std::placeholders::_1,
906                                         std::placeholders::_2),
907                                         defaultNamespace,
908                                         contents);
909 }
910
911 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
912 {
913     InSequence dummy;
914     expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
915     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
916                                         this,
917                                         std::placeholders::_1,
918                                         std::placeholders::_2),
919                                         defaultNamespace,
920                                         contents);
921     expectAck();
922     savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
923     dispatcher->disableCommandCallbacks();
924     expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
925     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
926                                         this,
927                                         std::placeholders::_1,
928                                         std::placeholders::_2),
929                                         defaultNamespace,
930                                         contents);
931     expectAckNotCalled();
932     savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
933 }
934
935 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DisconnectCallbackDetachesContextFromAdapter)
936 {
937     InSequence dummy;
938     expectAdapterDetach();
939     disconnected(&acc, &ac, 0);
940 }
941
942 TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
943 {
944     InSequence dummy;
945     dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::disconnectCallback,
946                                              this));
947     expectAdapterDetach();
948     expectDisconnectCallback();
949     disconnected(&acc, &ac, 0);
950 }
951
952 TEST_F(AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
953 {
954     InSequence dummy;
955     redisClusterCallbackFn* savedCb;
956     void* savedPd;
957     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
958         .Times(1)
959         .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
960                                               const char*, int, int, const char**, const size_t*)
961                          {
962                              savedCb = cb;
963                              savedPd = pd;
964                              return REDIS_OK;
965                          }));
966     Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
967     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
968                                         this,
969                                         std::placeholders::_1,
970                                         std::placeholders::_2),
971                               defaultNamespace,
972                               contents);
973     EXPECT_CALL(*this, ack(std::error_code(), _))
974             .Times(3);
975     redisReply rr;
976     rr.type = REDIS_REPLY_NIL;
977     savedCb(&acc, &rr, savedPd);
978     savedCb(&acc, &rr, savedPd);
979     savedCb(&acc, &rr, savedPd);
980 }
981
982 TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
983 {
984     InSequence dummy;
985     redisClusterCallbackFn* savedCb;
986     void* savedPd;
987     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
988         .Times(1)
989         .WillOnce(Invoke([this, &savedCb, &savedPd](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd,
990                                                     const char*, int, int, const char**, const size_t*)
991                          {
992                              savedCb = cb;
993                              savedPd = pd;
994                              cb(acc, &redisReplyBuilder.buildNilReply(), pd);
995                              return REDIS_OK;
996                          }));
997     expectAck();
998     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
999                                         this,
1000                                         std::placeholders::_1,
1001                                         std::placeholders::_2),
1002                                         defaultNamespace,
1003                                         contents);
1004     EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
1005 }
1006
1007 TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, TooManyRepliesAborts)
1008 {
1009     InSequence dummy;
1010     redisClusterCallbackFn* savedCb;
1011     void* savedPd;
1012     EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
1013         .Times(1)
1014         .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
1015                                               const char*, int, int, const char**, const size_t*)
1016                          {
1017                              savedCb = cb;
1018                              savedPd = pd;
1019                              return REDIS_OK;
1020                          }));
1021     Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
1022     expectAck();
1023     dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
1024                                         this,
1025                                         std::placeholders::_1,
1026                                         std::placeholders::_2),
1027                                         defaultNamespace,
1028                                         contents);
1029     savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd);
1030     EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
1031 }