Add first version
[ric-plt/sdl.git] / tst / asynchirediscommanddispatcher_test.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include <type_traits>
18 #include <memory>
19 #include <cstring>
20 #include <sys/epoll.h>
21 #include <sys/timerfd.h>
22 #include <sys/eventfd.h>
23 #include <arpa/inet.h>
24 #include <gtest/gtest.h>
25 #include <async.h>
26 #include "private/createlogger.hpp"
27 #include "private/error.hpp"
28 #include "private/logger.hpp"
29 #include "private/redis/asynchirediscommanddispatcher.hpp"
30 #include "private/redis/reply.hpp"
31 #include "private/redis/contents.hpp"
32 #include "private/timer.hpp"
33 #include "private/tst/contentsbuildermock.hpp"
34 #include "private/tst/enginemock.hpp"
35 #include "private/tst/hiredissystemmock.hpp"
36 #include "private/tst/hiredisepolladaptermock.hpp"
37 #include "private/tst/redisreplybuilder.hpp"
38
39 using namespace shareddatalayer;
40 using namespace shareddatalayer::redis;
41 using namespace shareddatalayer::tst;
42 using namespace testing;
43
44 namespace
45 {
46     class AsyncHiredisCommandDispatcherBaseTest: public testing::Test
47     {
48     public:
49         std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
50         StrictMock<EngineMock> engineMock;
51         HiredisSystemMock hiredisSystemMock;
52         std::shared_ptr<HiredisEpollAdapterMock> adapterMock;
53         redisAsyncContext ac;
54         int hiredisFd;
55         std::unique_ptr<AsyncHiredisCommandDispatcher> dispatcher;
56         void (*connected)(const redisAsyncContext*, int);
57         void (*disconnected)(const redisAsyncContext*, int);
58         Timer::Callback savedConnectionRetryTimerCallback;
59         Timer::Duration expectedConnectionRetryTimerDuration;
60         Timer::Duration expectedConnectionVerificationRetryTimerDuration;
61         RedisReplyBuilder redisReplyBuilder;
62         const AsyncConnection::Namespace defaultNamespace;
63         std::shared_ptr<Logger> logger;
64
65         AsyncHiredisCommandDispatcherBaseTest():
66             contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
67             adapterMock(std::make_shared<HiredisEpollAdapterMock>(engineMock, hiredisSystemMock)),
68             ac { },
69             hiredisFd(3),
70             connected(nullptr),
71             disconnected(nullptr),
72             expectedConnectionRetryTimerDuration(std::chrono::seconds(1)),
73             expectedConnectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
74             redisReplyBuilder { },
75             defaultNamespace("namespace"),
76             logger(createLogger(SDL_LOG_PREFIX))
77         {
78         }
79
80         virtual ~AsyncHiredisCommandDispatcherBaseTest()
81         {
82         }
83
84         MOCK_METHOD0(connectAck, void());
85
86         MOCK_METHOD0(disconnectCallback, void());
87
88         MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
89
90         void expectationsUntilConnect()
91         {
92             expectationsUntilConnect(ac);
93         }
94
95         void expectationsUntilConnect(redisAsyncContext& ac)
96         {
97             expectRedisAsyncConnect(ac);
98         }
99
100         void expectRedisAsyncConnect()
101         {
102             expectRedisAsyncConnect(ac);
103         }
104
105         void expectRedisAsyncConnect(redisAsyncContext& ac)
106         {
107             EXPECT_CALL(hiredisSystemMock, redisAsyncConnect(StrEq("host"), 6379U))
108                 .Times(1)
109                 .WillOnce(InvokeWithoutArgs([this, &ac]()
110                                             {
111                                                 ac.c.fd = hiredisFd;
112                                                 return &ac;
113                                             }));
114         }
115
116         void expectRedisAsyncConnectReturnNullptr()
117         {
118             EXPECT_CALL(hiredisSystemMock, redisAsyncConnect(StrEq("host"), 6379U))
119                 .Times(1)
120                 .WillOnce(InvokeWithoutArgs([this]()
121                                             {
122                                                 ac.c.fd = hiredisFd;
123                                                 return nullptr;
124                                             }));
125         }
126
127         void expectRedisAsyncSetConnectCallback()
128         {
129             expectRedisAsyncSetConnectCallback(ac);
130         }
131
132         void expectRedisAsyncSetConnectCallback(redisAsyncContext& ac)
133         {
134             EXPECT_CALL(hiredisSystemMock, redisAsyncSetConnectCallback(&ac, _))
135                 .Times(1)
136                 .WillOnce(Invoke([this](const redisAsyncContext*, redisConnectCallback* cb)
137                                  {
138                                      connected = cb;
139                                      return REDIS_OK;
140                                  }));
141         }
142
143         void expectRedisAsyncSetDisconnectCallback()
144         {
145             expectRedisAsyncSetDisconnectCallback(ac);
146         }
147
148         void expectRedisAsyncSetDisconnectCallback(redisAsyncContext& ac)
149         {
150             EXPECT_CALL(hiredisSystemMock, redisAsyncSetDisconnectCallback(&ac, _))
151                 .Times(1)
152                 .WillOnce(Invoke([this](const redisAsyncContext*, redisDisconnectCallback* cb)
153                                  {
154                                      disconnected = cb;
155                                      return REDIS_OK;
156                                  }));
157         }
158
159         void expectAdapterAttach()
160         {
161             expectAdapterAttach(ac);
162         }
163
164         void expectAdapterAttach(redisAsyncContext& ac)
165         {
166             EXPECT_CALL(*adapterMock, attach(&ac))
167                 .Times(1);
168         }
169
170         void expectConnectAck()
171         {
172             EXPECT_CALL(*this, connectAck())
173                 .Times(1);
174         }
175
176         void expectDisconnectCallback()
177         {
178             EXPECT_CALL(*this, disconnectCallback())
179                 .Times(1);
180         }
181
182         void expectRedisAsyncFree()
183         {
184             EXPECT_CALL(hiredisSystemMock, redisAsyncFree(&ac))
185                 .Times(1);
186         }
187
188         void expectRedisAsyncDisconnect()
189         {
190             EXPECT_CALL(hiredisSystemMock, redisAsyncDisconnect(&ac))
191                 .Times(1);
192         }
193
194         void expectRedisAsyncCommandArgv(redisReply& rr)
195         {
196             EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
197                 .Times(1)
198                 .WillOnce(Invoke([&rr](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
199                                        int, const char**, const size_t*)
200                                  {
201                                      cb(ac, &rr, pd);
202                                      return REDIS_OK;
203                                  }));
204         }
205
206         void expectCommandListQuery()
207         {
208             expectRedisAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
209         }
210
211         void expectCommandListQueryReturnError()
212         {
213             expectRedisAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
214         }
215
216         void verifyAckErrorReply(const Reply& reply)
217         {
218             EXPECT_EQ(Reply::Type::NIL, reply.getType());
219             EXPECT_EQ(0, reply.getInteger());
220             EXPECT_TRUE(reply.getString()->str.empty());
221             EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
222             EXPECT_TRUE(reply.getArray()->empty());
223         }
224
225         void expectAckError()
226         {
227             EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
228                 .Times(1)
229                 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
230                                  {
231                                      verifyAckErrorReply(reply);
232                                  }));
233         }
234
235         void expectAckError(const std::error_code& ec)
236         {
237             EXPECT_CALL(*this, ack(ec, _))
238                 .Times(1)
239                 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
240                                  {
241                                      verifyAckErrorReply(reply);
242                                  }));
243         }
244
245         void expectArmConnectionRetryTimer()
246         {
247             EXPECT_CALL(engineMock, armTimer(_, expectedConnectionRetryTimerDuration, _))
248                 .Times(1)
249                 .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
250         }
251
252         void expectArmConnectionVerificationRetryTimer()
253         {
254             EXPECT_CALL(engineMock, armTimer(_, expectedConnectionVerificationRetryTimerDuration, _))
255                 .Times(1)
256                 .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
257         }
258
259         void expectDisarmConnectionRetryTimer()
260         {
261             EXPECT_CALL(engineMock, disarmTimer(_))
262                 .Times(1);
263         }
264     };
265
266     class AsyncHiredisCommandDispatcherDisconnectedTest: public AsyncHiredisCommandDispatcherBaseTest
267     {
268     public:
269         AsyncHiredisCommandDispatcherDisconnectedTest()
270         {
271                 InSequence dummy;
272                 expectationsUntilConnect();
273                 expectAdapterAttach();
274                 expectRedisAsyncSetConnectCallback();
275                 expectRedisAsyncSetDisconnectCallback();
276                 dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
277                                                                    "host",
278                                                                    htons(6379U),
279                                                                    contentsBuilderMock,
280                                                                    false,
281                                                                    hiredisSystemMock,
282                                                                    adapterMock,
283                                                                    logger));
284         }
285
286         ~AsyncHiredisCommandDispatcherDisconnectedTest()
287         {
288         }
289     };
290
291     class AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisCommandDispatcherBaseTest
292     {
293     public:
294         AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest()
295         {
296                 InSequence dummy;
297                 expectationsUntilConnect();
298                 expectAdapterAttach();
299                 expectRedisAsyncSetConnectCallback();
300                 expectRedisAsyncSetDisconnectCallback();
301                 dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
302                                                                    "host",
303                                                                    htons(6379U),
304                                                                    contentsBuilderMock,
305                                                                    true,
306                                                                    hiredisSystemMock,
307                                                                    adapterMock,
308                                                                    logger));
309         }
310
311         ~AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest()
312         {
313             expectRedisAsyncFree();
314         }
315     };
316
317     class AsyncHiredisCommandDispatcherConnectedTest: public AsyncHiredisCommandDispatcherDisconnectedTest
318     {
319     public:
320         Contents contents;
321         redisCallbackFn* savedCb;
322         void* savedPd;
323
324         AsyncHiredisCommandDispatcherConnectedTest():
325             contents { { "CMD", "key1", "value1", "key2", "value2" },
326                        { 3, 4, 6, 4, 6 } },
327             savedCb(nullptr),
328             savedPd(nullptr)
329         {
330             expectCommandListQuery();
331             connected(&ac, 0);
332         }
333
334         ~AsyncHiredisCommandDispatcherConnectedTest()
335         {
336             expectRedisAsyncFree();
337         }
338
339         void expectAck()
340         {
341             EXPECT_CALL(*this, ack(std::error_code(), _))
342                 .Times(1);
343         }
344
345         void expectReplyError(const std::string& msg)
346         {
347             EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
348                 .Times(1)
349                 .WillOnce(Invoke([this, msg](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
350                                     int, const char**, const size_t*)
351                                  {
352                                      cb(ac, &redisReplyBuilder.buildErrorReply(msg), pd);
353                                      return REDIS_OK;
354                                  }));
355         }
356
357         void expectContextError(int code)
358         {
359             EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
360                 .Times(1)
361                 .WillOnce(Invoke([code](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
362                                  {
363                                      ac->err = code;
364                                      cb(ac, nullptr, pd);
365                                      return REDIS_OK;
366                                  }));
367         }
368
369         void expectRedisAsyncFreeCallPendingCallback(redisCallbackFn* cb, void* pd)
370         {
371             EXPECT_CALL(hiredisSystemMock, redisAsyncFree(&ac))
372                 .Times(1)
373                 .WillOnce(Invoke([cb, pd](redisAsyncContext* ac)
374                                  {
375                                      cb(ac, nullptr, pd);
376                                  }));
377         }
378
379         void expectAckNotCalled()
380         {
381             EXPECT_CALL(*this, ack(_,_))
382                 .Times(0);
383         }
384
385         void expectRedisAsyncCommandArgv_SaveCb()
386         {
387             EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
388                 .Times(1)
389                 .WillRepeatedly(Invoke([this](redisAsyncContext*, redisCallbackFn* cb, void* pd,
390                                               int, const char**, const size_t*)
391                                               {
392                                                   savedCb = cb;
393                                                   savedPd = pd;
394                                                   return REDIS_OK;
395                                               }));
396         }
397     };
398
399     using AsyncHiredisCommandDispatcherDeathTest = AsyncHiredisCommandDispatcherConnectedTest;
400 }
401
402 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, IsNotCopyable)
403 {
404     EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisCommandDispatcher>::value);
405     EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisCommandDispatcher>::value);
406 }
407
408 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
409 {
410     EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisCommandDispatcher>::value));
411 }
412
413 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
414 {
415     Engine::Callback storedCallback;
416     EXPECT_CALL(engineMock, postCallback(_))
417         .Times(1)
418         .WillOnce(SaveArg<0>(&storedCallback));
419     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::ack,
420                                         this,
421                                         std::placeholders::_1,
422                                         std::placeholders::_2),
423                               defaultNamespace,
424                               { });
425     expectAckError(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED));
426     storedCallback();
427 }
428
429 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ContextErrorInConnectArmsRetryTimer)
430 {
431     InSequence dummy;
432     expectationsUntilConnect();
433     expectArmConnectionRetryTimer();
434     ac.err = 123;
435     dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
436                                                        "host",
437                                                        htons(6379U),
438                                                        contentsBuilderMock,
439                                                        false,
440                                                        hiredisSystemMock,
441                                                        adapterMock,
442                                                        logger));
443     expectDisarmConnectionRetryTimer();
444 }
445
446 TEST_F(AsyncHiredisCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
447 {
448     InSequence dummy;
449     expectRedisAsyncConnectReturnNullptr();
450     expectArmConnectionRetryTimer();
451     expectDisarmConnectionRetryTimer();
452
453     dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
454                                                        "host",
455                                                        htons(6379U),
456                                                        contentsBuilderMock,
457                                                        false,
458                                                        hiredisSystemMock,
459                                                        adapterMock,
460                                                        logger));
461 }
462
463 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, FailedCommandListQueryArmsRetryTimer)
464 {
465     InSequence dummy;
466     expectCommandListQueryReturnError();
467     expectArmConnectionVerificationRetryTimer();
468     expectRedisAsyncFree();
469     connected(&ac, 0);
470     expectDisarmConnectionRetryTimer();
471 }
472
473 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ErrorInConnectedCallbackArmsRetryTimer)
474 {
475     InSequence dummy;
476     expectArmConnectionRetryTimer();
477     connected(&ac, -1);
478     expectDisarmConnectionRetryTimer();
479 }
480
481 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ConnectionSucceedsWithRetryTimer)
482 {
483     InSequence dummy;
484     expectArmConnectionRetryTimer();
485
486     connected(&ac, -1);
487
488     expectationsUntilConnect();
489     expectAdapterAttach();
490     expectRedisAsyncSetConnectCallback();
491     expectRedisAsyncSetDisconnectCallback();
492
493     savedConnectionRetryTimerCallback();
494
495     expectCommandListQuery();
496     expectConnectAck();
497
498     dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::connectAck, this));
499     connected(&ac, 0);
500
501     expectRedisAsyncFree();
502 }
503
504 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ConnectAckCalledOnceConnected)
505 {
506     InSequence dummy;
507     expectCommandListQuery();
508     expectConnectAck();
509     dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::connectAck, this));
510     connected(&ac, 0);
511     expectRedisAsyncFree();
512 }
513
514 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ConnectAckCalledIfConnected)
515 {
516     Engine::Callback storedCallback;
517     EXPECT_CALL(engineMock, postCallback(_))
518         .Times(1)
519         .WillOnce(SaveArg<0>(&storedCallback));
520     expectCommandListQuery();
521     connected(&ac, 0);
522     dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::connectAck, this));
523     expectConnectAck();
524     storedCallback();
525     expectRedisAsyncFree();
526 }
527
528 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanDispatchCommands)
529 {
530     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
531         .Times(1)
532         .WillOnce(Invoke([this](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
533                                 int argc, const char** argv, const size_t* argvlen)
534                          {
535                              EXPECT_EQ((int)contents.stack.size(), argc);
536                              EXPECT_EQ(contents.sizes[0], argvlen[0]);
537                              EXPECT_EQ(contents.sizes[1], argvlen[1]);
538                              EXPECT_EQ(contents.sizes[2], argvlen[2]);
539                              EXPECT_EQ(contents.sizes[3], argvlen[3]);
540                              EXPECT_EQ(contents.sizes[4], argvlen[4]);
541                              EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
542                              EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
543                              EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
544                              EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
545                              EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
546                              cb(ac, &redisReplyBuilder.buildNilReply(), pd);
547                              return REDIS_OK;
548                          }));
549     expectAck();
550     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
551                                         this,
552                                         std::placeholders::_1,
553                                         std::placeholders::_2),
554                               defaultNamespace,
555                               contents);
556 }
557
558 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseNilReply)
559 {
560     expectRedisAsyncCommandArgv(redisReplyBuilder.buildNilReply());
561     EXPECT_CALL(*this, ack(std::error_code(), _))
562         .Times(1)
563         .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
564                          {
565                              EXPECT_EQ(Reply::Type::NIL, reply.getType());
566                              EXPECT_EQ(0, reply.getInteger());
567                              EXPECT_TRUE(reply.getString()->str.empty());
568                              EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
569                              EXPECT_TRUE(reply.getArray()->empty());
570                          }));
571     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
572                                         this,
573                                         std::placeholders::_1,
574                                         std::placeholders::_2),
575                                         defaultNamespace,
576                                         contents);
577 }
578
579 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseIntegerReply)
580 {
581     expectRedisAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
582     EXPECT_CALL(*this, ack(std::error_code(), _))
583         .Times(1)
584         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
585                          {
586                              auto expected(redisReplyBuilder.buildIntegerReply());
587                              EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
588                              EXPECT_EQ(expected.integer, reply.getInteger());
589                          }));
590     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
591                                         this,
592                                         std::placeholders::_1,
593                                         std::placeholders::_2),
594                                         defaultNamespace,
595                                         contents);
596 }
597
598 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseStatusReply)
599 {
600     expectRedisAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
601     EXPECT_CALL(*this, ack(std::error_code(), _))
602         .Times(1)
603         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
604                          {
605                              auto expected(redisReplyBuilder.buildStatusReply());
606                              EXPECT_EQ(Reply::Type::STATUS, reply.getType());
607                              EXPECT_EQ(expected.len, reply.getString()->len);
608                              EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
609                          }));
610     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
611                                         this,
612                                         std::placeholders::_1,
613                                         std::placeholders::_2),
614                                         defaultNamespace,
615                                         contents);
616 }
617
618 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseStringReply)
619 {
620     expectRedisAsyncCommandArgv(redisReplyBuilder.buildStringReply());
621     EXPECT_CALL(*this, ack(std::error_code(), _))
622         .Times(1)
623         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
624                          {
625                              auto expected(redisReplyBuilder.buildStringReply());
626                              EXPECT_EQ(Reply::Type::STRING, reply.getType());
627                              EXPECT_EQ(expected.len, reply.getString()->len);
628                              EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
629                          }));
630     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
631                                         this,
632                                         std::placeholders::_1,
633                                         std::placeholders::_2),
634                                         defaultNamespace,
635                                         contents);
636 }
637
638 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseArrayReply)
639 {
640     expectRedisAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
641     EXPECT_CALL(*this, ack(std::error_code(), _))
642         .Times(1)
643         .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
644                          {
645                              auto array(reply.getArray());
646                              EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
647                              EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
648                              EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
649                          }));
650     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
651                                         this,
652                                         std::placeholders::_1,
653                                         std::placeholders::_2),
654                                         defaultNamespace,
655                                         contents);
656 }
657
658 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
659 {
660     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
661         .Times(1)
662         .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn*, void*, int, const char**, const size_t*)
663                          {
664                              ac->err = REDIS_ERR;
665                              return REDIS_ERR;
666                          }));
667     Engine::Callback storedCallback;
668     EXPECT_CALL(engineMock, postCallback(_))
669         .Times(1)
670         .WillOnce(SaveArg<0>(&storedCallback));
671     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
672                                         this,
673                                         std::placeholders::_1,
674                                         std::placeholders::_2),
675                                         defaultNamespace,
676                                         contents);
677     expectAckError();
678     storedCallback();
679 }
680
681 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
682 {
683     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
684         .Times(1)
685         .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
686                          {
687                              cb(ac, nullptr, pd);
688                              return REDIS_OK;
689                          }));
690     expectAckError();
691     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
692                                         this,
693                                         std::placeholders::_1,
694                                         std::placeholders::_2),
695                                         defaultNamespace,
696                                         contents);
697 }
698
699 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
700 {
701     expectReplyError("LOADING Redis is loading the dataset in memory");
702     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
703         .Times(1);
704     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
705                                         this,
706                                         std::placeholders::_1,
707                                         std::placeholders::_2),
708                                         defaultNamespace,
709                                         contents);
710 }
711
712 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
713 {
714     expectReplyError("ERR Protocol error: invalid bulk length");
715     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
716         .Times(1);
717     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
718                                         this,
719                                         std::placeholders::_1,
720                                         std::placeholders::_2),
721                                         defaultNamespace,
722                                         contents);
723 }
724
725 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
726 {
727     expectReplyError("something sinister");
728     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
729         .Times(1);
730     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
731                                         this,
732                                         std::placeholders::_1,
733                                         std::placeholders::_2),
734                                         defaultNamespace,
735                                         contents);
736 }
737
738 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, IOErrorInContext)
739 {
740     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
741         .Times(1)
742         .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
743                          {
744                              ac->err = REDIS_ERR_IO;
745                              errno = EINVAL;
746                              cb(ac, nullptr, pd);
747                              return REDIS_OK;
748                          }));
749     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
750         .Times(1);
751     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
752                                         this,
753                                         std::placeholders::_1,
754                                         std::placeholders::_2),
755                                         defaultNamespace,
756                                         contents);
757 }
758
759 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, IOErrorInContextWithECONNRESETerrnoValue)
760 {
761     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
762         .Times(1)
763         .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
764                          {
765                              ac->err = REDIS_ERR_IO;
766                              errno = ECONNRESET;
767                              cb(ac, nullptr, pd);
768                              return REDIS_OK;
769                          }));
770     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
771         .Times(1);
772     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
773                                         this,
774                                         std::placeholders::_1,
775                                         std::placeholders::_2),
776                                         defaultNamespace,
777                                         contents);
778 }
779
780 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, EofErrorInContext)
781 {
782     expectContextError(REDIS_ERR_EOF);
783     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
784         .Times(1);
785     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
786                                         this,
787                                         std::placeholders::_1,
788                                         std::placeholders::_2),
789                                         defaultNamespace,
790                                         contents);
791 }
792
793 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, ProtocolErrorInContext)
794 {
795     expectContextError(REDIS_ERR_PROTOCOL);
796     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
797         .Times(1);
798     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
799                                         this,
800                                         std::placeholders::_1,
801                                         std::placeholders::_2),
802                                         defaultNamespace,
803                                         contents);
804 }
805
806 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, OomErrorInContext)
807 {
808     expectContextError(REDIS_ERR_OOM);
809     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
810         .Times(1);
811     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
812                                         this,
813                                         std::placeholders::_1,
814                                         std::placeholders::_2),
815                                         defaultNamespace,
816                                         contents);
817 }
818
819 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
820 {
821     expectContextError(REDIS_ERR_OTHER);
822     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
823         .Times(1);
824     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
825                                         this,
826                                         std::placeholders::_1,
827                                         std::placeholders::_2),
828                                         defaultNamespace,
829                                         contents);
830 }
831
832 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
833 {
834     InSequence dummy;
835     expectRedisAsyncCommandArgv_SaveCb();
836     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
837                                         this,
838                                         std::placeholders::_1,
839                                         std::placeholders::_2),
840                                         defaultNamespace,
841                                         contents);
842     expectAck();
843     savedCb(&ac, &redisReplyBuilder.buildStringReply(), savedPd);
844     dispatcher->disableCommandCallbacks();
845     expectRedisAsyncCommandArgv_SaveCb();
846     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
847                                         this,
848                                         std::placeholders::_1,
849                                         std::placeholders::_2),
850                                         defaultNamespace,
851                                         contents);
852     expectAckNotCalled();
853     savedCb(&ac, &redisReplyBuilder.buildStringReply(), savedPd);
854 }
855
856 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
857 {
858     InSequence dummy;
859     dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::disconnectCallback,
860                                              this));
861     expectDisconnectCallback();
862     expectArmConnectionRetryTimer();
863     disconnected(&ac, 0);
864     expectDisarmConnectionRetryTimer();
865     expectCommandListQuery();
866     connected(&ac, 0); // restore connection to meet destructor expectations
867 }
868
869 TEST_F(AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
870 {
871     InSequence dummy;
872     redisCallbackFn* savedCb;
873     void* savedPd;
874     Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
875     expectCommandListQuery();
876     connected(&ac, 0);
877     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
878         .Times(1)
879         .WillOnce(Invoke([&savedCb, &savedPd](redisAsyncContext*, redisCallbackFn* cb, void* pd,
880                                               int, const char**, const size_t*)
881                          {
882                              savedCb = cb;
883                              savedPd = pd;
884                              return REDIS_OK;
885                          }));
886     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
887                                         this,
888                                         std::placeholders::_1,
889                                         std::placeholders::_2),
890                                         defaultNamespace,
891                                         contents);
892     EXPECT_CALL(*this, ack(std::error_code(), _))
893             .Times(3);
894     redisReply rr;
895     rr.type = REDIS_REPLY_NIL;
896     savedCb(&ac, &rr, savedPd);
897     savedCb(&ac, &rr, savedPd);
898     savedCb(&ac, &rr, savedPd);
899 }
900
901 TEST_F(AsyncHiredisCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
902 {
903     redisCallbackFn* savedCb;
904     void* savedPd;
905     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
906         .Times(1)
907         .WillOnce(Invoke([this, &savedCb, &savedPd](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
908                                                     int, const char**, const size_t*)
909                          {
910                              savedCb = cb;
911                              savedPd = pd;
912                              cb(ac, &redisReplyBuilder.buildNilReply(), pd);
913                              return REDIS_OK;
914                          }));
915     expectAck();
916     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherDeathTest::ack,
917                                         this,
918                                         std::placeholders::_1,
919                                         std::placeholders::_2),
920                                         defaultNamespace,
921                                         contents);
922     EXPECT_EXIT(savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
923 }
924
925 TEST_F(AsyncHiredisCommandDispatcherDeathTest, TooManyRepliesAborts)
926 {
927     InSequence dummy;
928     redisCallbackFn* savedCb;
929     void* savedPd;
930     Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
931     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
932         .Times(1)
933         .WillOnce(Invoke([&savedCb, &savedPd](redisAsyncContext*, redisCallbackFn* cb, void* pd,
934                                               int, const char**, const size_t*)
935                          {
936                              savedCb = cb;
937                              savedPd = pd;
938                              return REDIS_OK;
939                          }));
940     expectAck();
941     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
942                                         this,
943                                         std::placeholders::_1,
944                                         std::placeholders::_2),
945                                         defaultNamespace,
946                                         contents);
947     savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd);
948     EXPECT_EXIT(savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
949 }