RIC:1060: Change in PTL
[ric-plt/sdl.git] / tst / asynchirediscommanddispatcher_test.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include <type_traits>
23 #include <memory>
24 #include <cstring>
25 #include <sys/epoll.h>
26 #include <sys/timerfd.h>
27 #include <sys/eventfd.h>
28 #include <arpa/inet.h>
29 #include <gtest/gtest.h>
30 #include <async.h>
31 #include "private/createlogger.hpp"
32 #include "private/error.hpp"
33 #include "private/logger.hpp"
34 #include "private/redis/asynchirediscommanddispatcher.hpp"
35 #include "private/redis/reply.hpp"
36 #include "private/redis/contents.hpp"
37 #include "private/timer.hpp"
38 #include "private/tst/contentsbuildermock.hpp"
39 #include "private/tst/enginemock.hpp"
40 #include "private/tst/hiredissystemmock.hpp"
41 #include "private/tst/hiredisepolladaptermock.hpp"
42 #include "private/tst/redisreplybuilder.hpp"
43
44 using namespace shareddatalayer;
45 using namespace shareddatalayer::redis;
46 using namespace shareddatalayer::tst;
47 using namespace testing;
48
49 namespace
50 {
51     class AsyncHiredisCommandDispatcherBaseTest: public testing::Test
52     {
53     public:
54         std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
55         StrictMock<EngineMock> engineMock;
56         HiredisSystemMock hiredisSystemMock;
57         std::shared_ptr<HiredisEpollAdapterMock> adapterMock;
58         redisAsyncContext ac;
59         int hiredisFd;
60         std::unique_ptr<AsyncHiredisCommandDispatcher> dispatcher;
61         void (*connected)(const redisAsyncContext*, int);
62         void (*disconnected)(const redisAsyncContext*, int);
63         Timer::Callback savedConnectionRetryTimerCallback;
64         Timer::Duration expectedConnectionRetryTimerDuration;
65         Timer::Duration expectedConnectionVerificationRetryTimerDuration;
66         RedisReplyBuilder redisReplyBuilder;
67         const AsyncConnection::Namespace defaultNamespace;
68         std::shared_ptr<Logger> logger;
69
70         AsyncHiredisCommandDispatcherBaseTest():
71             contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
72             adapterMock(std::make_shared<HiredisEpollAdapterMock>(engineMock, hiredisSystemMock)),
73             ac { },
74             hiredisFd(3),
75             connected(nullptr),
76             disconnected(nullptr),
77             expectedConnectionRetryTimerDuration(std::chrono::seconds(1)),
78             expectedConnectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
79             redisReplyBuilder { },
80             defaultNamespace("namespace"),
81             logger(createLogger(SDL_LOG_PREFIX))
82         {
83         }
84
85         virtual ~AsyncHiredisCommandDispatcherBaseTest()
86         {
87         }
88
89         MOCK_METHOD0(connectAck, void());
90
91         MOCK_METHOD0(disconnectCallback, void());
92
93         MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
94
95         void expectationsUntilConnect()
96         {
97             expectationsUntilConnect(ac);
98         }
99
100         void expectationsUntilConnect(redisAsyncContext& ac)
101         {
102             expectRedisAsyncConnect(ac);
103         }
104
105         void expectRedisAsyncConnect()
106         {
107             expectRedisAsyncConnect(ac);
108         }
109
110         void expectRedisAsyncConnect(redisAsyncContext& ac)
111         {
112             EXPECT_CALL(hiredisSystemMock, redisAsyncConnect(StrEq("host"), 6379U))
113                 .Times(1)
114                 .WillOnce(InvokeWithoutArgs([this, &ac]()
115                                             {
116                                                 ac.c.fd = hiredisFd;
117                                                 return &ac;
118                                             }));
119         }
120
121         void expectRedisAsyncConnectReturnNullptr()
122         {
123             EXPECT_CALL(hiredisSystemMock, redisAsyncConnect(StrEq("host"), 6379U))
124                 .Times(1)
125                 .WillOnce(InvokeWithoutArgs([this]()
126                                             {
127                                                 ac.c.fd = hiredisFd;
128                                                 return nullptr;
129                                             }));
130         }
131
132         void expectRedisAsyncSetConnectCallback()
133         {
134             expectRedisAsyncSetConnectCallback(ac);
135         }
136
137         void expectRedisAsyncSetConnectCallback(redisAsyncContext& ac)
138         {
139             EXPECT_CALL(hiredisSystemMock, redisAsyncSetConnectCallback(&ac, _))
140                 .Times(1)
141                 .WillOnce(Invoke([this](const redisAsyncContext*, redisConnectCallback* cb)
142                                  {
143                                      connected = cb;
144                                      return REDIS_OK;
145                                  }));
146         }
147
148         void expectRedisAsyncSetDisconnectCallback()
149         {
150             expectRedisAsyncSetDisconnectCallback(ac);
151         }
152
153         void expectRedisAsyncSetDisconnectCallback(redisAsyncContext& ac)
154         {
155             EXPECT_CALL(hiredisSystemMock, redisAsyncSetDisconnectCallback(&ac, _))
156                 .Times(1)
157                 .WillOnce(Invoke([this](const redisAsyncContext*, redisDisconnectCallback* cb)
158                                  {
159                                      disconnected = cb;
160                                      return REDIS_OK;
161                                  }));
162         }
163
164         void expectAdapterAttach()
165         {
166             expectAdapterAttach(ac);
167         }
168
169         void expectAdapterAttach(redisAsyncContext& ac)
170         {
171             EXPECT_CALL(*adapterMock, attach(&ac))
172                 .Times(1);
173         }
174
175         void expectConnectAck()
176         {
177             EXPECT_CALL(*this, connectAck())
178                 .Times(1);
179         }
180
181         void expectDisconnectCallback()
182         {
183             EXPECT_CALL(*this, disconnectCallback())
184                 .Times(1);
185         }
186
187         void expectRedisAsyncFree()
188         {
189             EXPECT_CALL(hiredisSystemMock, redisAsyncFree(&ac))
190                 .Times(1);
191         }
192
193         void expectRedisAsyncDisconnect()
194         {
195             EXPECT_CALL(hiredisSystemMock, redisAsyncDisconnect(&ac))
196                 .Times(1);
197         }
198
199         void expectRedisAsyncCommandArgv(redisReply& rr)
200         {
201             EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
202                 .Times(1)
203                 .WillOnce(Invoke([&rr](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
204                                        int, const char**, const size_t*)
205                                  {
206                                      cb(ac, &rr, pd);
207                                      return REDIS_OK;
208                                  }));
209         }
210
211         void expectCommandListQuery()
212         {
213             expectRedisAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
214         }
215
216         void expectCommandListQueryReturnError()
217         {
218             expectRedisAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
219         }
220
221         void verifyAckErrorReply(const Reply& reply)
222         {
223             EXPECT_EQ(Reply::Type::NIL, reply.getType());
224             EXPECT_EQ(0, reply.getInteger());
225             EXPECT_TRUE(reply.getString()->str.empty());
226             EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
227             EXPECT_TRUE(reply.getArray()->empty());
228         }
229
230         void expectAckError()
231         {
232             EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
233                 .Times(1)
234                 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
235                                  {
236                                      verifyAckErrorReply(reply);
237                                  }));
238         }
239
240         void expectAckError(const std::error_code& ec)
241         {
242             EXPECT_CALL(*this, ack(ec, _))
243                 .Times(1)
244                 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
245                                  {
246                                      verifyAckErrorReply(reply);
247                                  }));
248         }
249
250         void expectArmConnectionRetryTimer()
251         {
252             EXPECT_CALL(engineMock, armTimer(_, expectedConnectionRetryTimerDuration, _))
253                 .Times(1)
254                 .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
255         }
256
257         void expectArmConnectionVerificationRetryTimer()
258         {
259             EXPECT_CALL(engineMock, armTimer(_, expectedConnectionVerificationRetryTimerDuration, _))
260                 .Times(1)
261                 .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
262         }
263
264         void expectDisarmConnectionRetryTimer()
265         {
266             EXPECT_CALL(engineMock, disarmTimer(_))
267                 .Times(1);
268         }
269     };
270
271     class AsyncHiredisCommandDispatcherDisconnectedTest: public AsyncHiredisCommandDispatcherBaseTest
272     {
273     public:
274         AsyncHiredisCommandDispatcherDisconnectedTest()
275         {
276                 InSequence dummy;
277                 expectationsUntilConnect();
278                 expectAdapterAttach();
279                 expectRedisAsyncSetConnectCallback();
280                 expectRedisAsyncSetDisconnectCallback();
281                 dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
282                                                                    "host",
283                                                                    htons(6379U),
284                                                                    contentsBuilderMock,
285                                                                    false,
286                                                                    hiredisSystemMock,
287                                                                    adapterMock,
288                                                                    logger,
289                                                                    false));
290         }
291
292         ~AsyncHiredisCommandDispatcherDisconnectedTest()
293         {
294         }
295     };
296
297     class AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisCommandDispatcherBaseTest
298     {
299     public:
300         AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest()
301         {
302                 InSequence dummy;
303                 expectationsUntilConnect();
304                 expectAdapterAttach();
305                 expectRedisAsyncSetConnectCallback();
306                 expectRedisAsyncSetDisconnectCallback();
307                 dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
308                                                                    "host",
309                                                                    htons(6379U),
310                                                                    contentsBuilderMock,
311                                                                    true,
312                                                                    hiredisSystemMock,
313                                                                    adapterMock,
314                                                                    logger,
315                                                                    false));
316         }
317
318         ~AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest()
319         {
320             expectRedisAsyncFree();
321         }
322     };
323
324     class AsyncHiredisCommandDispatcherConnectedTest: public AsyncHiredisCommandDispatcherDisconnectedTest
325     {
326     public:
327         Contents contents;
328         redisCallbackFn* savedCb;
329         void* savedPd;
330
331         AsyncHiredisCommandDispatcherConnectedTest():
332             contents { { "CMD", "key1", "value1", "key2", "value2" },
333                        { 3, 4, 6, 4, 6 } },
334             savedCb(nullptr),
335             savedPd(nullptr)
336         {
337             expectCommandListQuery();
338             connected(&ac, 0);
339         }
340
341         ~AsyncHiredisCommandDispatcherConnectedTest()
342         {
343             expectRedisAsyncFree();
344         }
345
346         void expectAck()
347         {
348             EXPECT_CALL(*this, ack(std::error_code(), _))
349                 .Times(1);
350         }
351
352         void expectReplyError(const std::string& msg)
353         {
354             EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
355                 .Times(1)
356                 .WillOnce(Invoke([this, msg](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
357                                     int, const char**, const size_t*)
358                                  {
359                                      cb(ac, &redisReplyBuilder.buildErrorReply(msg), pd);
360                                      return REDIS_OK;
361                                  }));
362         }
363
364         void expectContextError(int code)
365         {
366             EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
367                 .Times(1)
368                 .WillOnce(Invoke([code](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
369                                  {
370                                      ac->err = code;
371                                      cb(ac, nullptr, pd);
372                                      return REDIS_OK;
373                                  }));
374         }
375
376         void expectRedisAsyncFreeCallPendingCallback(redisCallbackFn* cb, void* pd)
377         {
378             EXPECT_CALL(hiredisSystemMock, redisAsyncFree(&ac))
379                 .Times(1)
380                 .WillOnce(Invoke([cb, pd](redisAsyncContext* ac)
381                                  {
382                                      cb(ac, nullptr, pd);
383                                  }));
384         }
385
386         void expectAckNotCalled()
387         {
388             EXPECT_CALL(*this, ack(_,_))
389                 .Times(0);
390         }
391
392         void expectRedisAsyncCommandArgv_SaveCb()
393         {
394             EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
395                 .Times(1)
396                 .WillRepeatedly(Invoke([this](redisAsyncContext*, redisCallbackFn* cb, void* pd,
397                                               int, const char**, const size_t*)
398                                               {
399                                                   savedCb = cb;
400                                                   savedPd = pd;
401                                                   return REDIS_OK;
402                                               }));
403         }
404     };
405
406     using AsyncHiredisCommandDispatcherDeathTest = AsyncHiredisCommandDispatcherConnectedTest;
407
408     class AsyncHiredisCommandDispatcherForSentinelTest: public AsyncHiredisCommandDispatcherBaseTest
409     {
410     public:
411         AsyncHiredisCommandDispatcherForSentinelTest()
412         {
413                 InSequence dummy;
414                 expectationsUntilConnect();
415                 expectAdapterAttach();
416                 expectRedisAsyncSetConnectCallback();
417                 expectRedisAsyncSetDisconnectCallback();
418                 dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
419                                                                    "host",
420                                                                    htons(6379U),
421                                                                    contentsBuilderMock,
422                                                                    true,
423                                                                    hiredisSystemMock,
424                                                                    adapterMock,
425                                                                    logger,
426                                                                    true));
427         }
428
429         ~AsyncHiredisCommandDispatcherForSentinelTest()
430         {
431             expectRedisAsyncFree();
432         }
433     };
434 }
435
436 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, IsNotCopyable)
437 {
438     EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisCommandDispatcher>::value);
439     EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisCommandDispatcher>::value);
440 }
441
442 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
443 {
444     EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisCommandDispatcher>::value));
445 }
446
447 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
448 {
449     Engine::Callback storedCallback;
450     EXPECT_CALL(engineMock, postCallback(_))
451         .Times(1)
452         .WillOnce(SaveArg<0>(&storedCallback));
453     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::ack,
454                                         this,
455                                         std::placeholders::_1,
456                                         std::placeholders::_2),
457                               defaultNamespace,
458                               { });
459     expectAckError(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED));
460     storedCallback();
461 }
462
463 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ContextErrorInConnectArmsRetryTimer)
464 {
465     InSequence dummy;
466     expectationsUntilConnect();
467     expectArmConnectionRetryTimer();
468     ac.err = 123;
469     dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
470                                                        "host",
471                                                        htons(6379U),
472                                                        contentsBuilderMock,
473                                                        false,
474                                                        hiredisSystemMock,
475                                                        adapterMock,
476                                                        logger,
477                                                        false));
478     expectDisarmConnectionRetryTimer();
479 }
480
481 TEST_F(AsyncHiredisCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
482 {
483     InSequence dummy;
484     expectRedisAsyncConnectReturnNullptr();
485     expectArmConnectionRetryTimer();
486     expectDisarmConnectionRetryTimer();
487
488     dispatcher.reset(new AsyncHiredisCommandDispatcher(engineMock,
489                                                        "host",
490                                                        htons(6379U),
491                                                        contentsBuilderMock,
492                                                        false,
493                                                        hiredisSystemMock,
494                                                        adapterMock,
495                                                        logger,
496                                                        false));
497 }
498
499 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, FailedCommandListQueryArmsRetryTimer)
500 {
501     InSequence dummy;
502     expectCommandListQueryReturnError();
503     expectArmConnectionVerificationRetryTimer();
504     expectRedisAsyncFree();
505     connected(&ac, 0);
506     expectDisarmConnectionRetryTimer();
507 }
508
509 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ErrorInConnectedCallbackArmsRetryTimer)
510 {
511     InSequence dummy;
512     expectArmConnectionRetryTimer();
513     connected(&ac, -1);
514     expectDisarmConnectionRetryTimer();
515 }
516
517 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ConnectionSucceedsWithRetryTimer)
518 {
519     InSequence dummy;
520     expectArmConnectionRetryTimer();
521
522     connected(&ac, -1);
523
524     expectationsUntilConnect();
525     expectAdapterAttach();
526     expectRedisAsyncSetConnectCallback();
527     expectRedisAsyncSetDisconnectCallback();
528
529     savedConnectionRetryTimerCallback();
530
531     expectCommandListQuery();
532     expectConnectAck();
533
534     dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::connectAck, this));
535     connected(&ac, 0);
536
537     expectRedisAsyncFree();
538 }
539
540 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ConnectAckCalledOnceConnected)
541 {
542     InSequence dummy;
543     expectCommandListQuery();
544     expectConnectAck();
545     dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::connectAck, this));
546     connected(&ac, 0);
547     expectRedisAsyncFree();
548 }
549
550 TEST_F(AsyncHiredisCommandDispatcherDisconnectedTest, ConnectAckCalledIfConnected)
551 {
552     Engine::Callback storedCallback;
553     EXPECT_CALL(engineMock, postCallback(_))
554         .Times(1)
555         .WillOnce(SaveArg<0>(&storedCallback));
556     expectCommandListQuery();
557     connected(&ac, 0);
558     dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisCommandDispatcherDisconnectedTest::connectAck, this));
559     expectConnectAck();
560     storedCallback();
561     expectRedisAsyncFree();
562 }
563
564 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanDispatchCommands)
565 {
566     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
567         .Times(1)
568         .WillOnce(Invoke([this](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
569                                 int argc, const char** argv, const size_t* argvlen)
570                          {
571                              EXPECT_EQ((int)contents.stack.size(), argc);
572                              EXPECT_EQ(contents.sizes[0], argvlen[0]);
573                              EXPECT_EQ(contents.sizes[1], argvlen[1]);
574                              EXPECT_EQ(contents.sizes[2], argvlen[2]);
575                              EXPECT_EQ(contents.sizes[3], argvlen[3]);
576                              EXPECT_EQ(contents.sizes[4], argvlen[4]);
577                              EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
578                              EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
579                              EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
580                              EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
581                              EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
582                              cb(ac, &redisReplyBuilder.buildNilReply(), pd);
583                              return REDIS_OK;
584                          }));
585     expectAck();
586     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
587                                         this,
588                                         std::placeholders::_1,
589                                         std::placeholders::_2),
590                               defaultNamespace,
591                               contents);
592 }
593
594 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseNilReply)
595 {
596     expectRedisAsyncCommandArgv(redisReplyBuilder.buildNilReply());
597     EXPECT_CALL(*this, ack(std::error_code(), _))
598         .Times(1)
599         .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
600                          {
601                              EXPECT_EQ(Reply::Type::NIL, reply.getType());
602                              EXPECT_EQ(0, reply.getInteger());
603                              EXPECT_TRUE(reply.getString()->str.empty());
604                              EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
605                              EXPECT_TRUE(reply.getArray()->empty());
606                          }));
607     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
608                                         this,
609                                         std::placeholders::_1,
610                                         std::placeholders::_2),
611                                         defaultNamespace,
612                                         contents);
613 }
614
615 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseIntegerReply)
616 {
617     expectRedisAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
618     EXPECT_CALL(*this, ack(std::error_code(), _))
619         .Times(1)
620         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
621                          {
622                              auto expected(redisReplyBuilder.buildIntegerReply());
623                              EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
624                              EXPECT_EQ(expected.integer, reply.getInteger());
625                          }));
626     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
627                                         this,
628                                         std::placeholders::_1,
629                                         std::placeholders::_2),
630                                         defaultNamespace,
631                                         contents);
632 }
633
634 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseStatusReply)
635 {
636     expectRedisAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
637     EXPECT_CALL(*this, ack(std::error_code(), _))
638         .Times(1)
639         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
640                          {
641                              auto expected(redisReplyBuilder.buildStatusReply());
642                              EXPECT_EQ(Reply::Type::STATUS, reply.getType());
643                              EXPECT_EQ(expected.len, reply.getString()->len);
644                              EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
645                          }));
646     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
647                                         this,
648                                         std::placeholders::_1,
649                                         std::placeholders::_2),
650                                         defaultNamespace,
651                                         contents);
652 }
653
654 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseStringReply)
655 {
656     expectRedisAsyncCommandArgv(redisReplyBuilder.buildStringReply());
657     EXPECT_CALL(*this, ack(std::error_code(), _))
658         .Times(1)
659         .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
660                          {
661                              auto expected(redisReplyBuilder.buildStringReply());
662                              EXPECT_EQ(Reply::Type::STRING, reply.getType());
663                              EXPECT_EQ(expected.len, reply.getString()->len);
664                              EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
665                          }));
666     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
667                                         this,
668                                         std::placeholders::_1,
669                                         std::placeholders::_2),
670                                         defaultNamespace,
671                                         contents);
672 }
673
674 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanParseArrayReply)
675 {
676     expectRedisAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
677     EXPECT_CALL(*this, ack(std::error_code(), _))
678         .Times(1)
679         .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
680                          {
681                              auto array(reply.getArray());
682                              EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
683                              EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
684                              EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
685                          }));
686     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
687                                         this,
688                                         std::placeholders::_1,
689                                         std::placeholders::_2),
690                                         defaultNamespace,
691                                         contents);
692 }
693
694 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
695 {
696     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
697         .Times(1)
698         .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn*, void*, int, const char**, const size_t*)
699                          {
700                              ac->err = REDIS_ERR;
701                              return REDIS_ERR;
702                          }));
703     Engine::Callback storedCallback;
704     EXPECT_CALL(engineMock, postCallback(_))
705         .Times(1)
706         .WillOnce(SaveArg<0>(&storedCallback));
707     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
708                                         this,
709                                         std::placeholders::_1,
710                                         std::placeholders::_2),
711                                         defaultNamespace,
712                                         contents);
713     expectAckError();
714     storedCallback();
715 }
716
717 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
718 {
719     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
720         .Times(1)
721         .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
722                          {
723                              cb(ac, nullptr, pd);
724                              return REDIS_OK;
725                          }));
726     expectAckError();
727     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
728                                         this,
729                                         std::placeholders::_1,
730                                         std::placeholders::_2),
731                                         defaultNamespace,
732                                         contents);
733 }
734
735 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
736 {
737     expectReplyError("LOADING Redis is loading the dataset in memory");
738     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
739         .Times(1);
740     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
741                                         this,
742                                         std::placeholders::_1,
743                                         std::placeholders::_2),
744                                         defaultNamespace,
745                                         contents);
746 }
747
748 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
749 {
750     expectReplyError("ERR Protocol error: invalid bulk length");
751     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
752         .Times(1);
753     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
754                                         this,
755                                         std::placeholders::_1,
756                                         std::placeholders::_2),
757                                         defaultNamespace,
758                                         contents);
759 }
760
761 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
762 {
763     expectReplyError("something sinister");
764     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
765         .Times(1);
766     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
767                                         this,
768                                         std::placeholders::_1,
769                                         std::placeholders::_2),
770                                         defaultNamespace,
771                                         contents);
772 }
773
774 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, IOErrorInContext)
775 {
776     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
777         .Times(1)
778         .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
779                          {
780                              ac->err = REDIS_ERR_IO;
781                              errno = EINVAL;
782                              cb(ac, nullptr, pd);
783                              return REDIS_OK;
784                          }));
785     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
786         .Times(1);
787     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
788                                         this,
789                                         std::placeholders::_1,
790                                         std::placeholders::_2),
791                                         defaultNamespace,
792                                         contents);
793 }
794
795 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, IOErrorInContextWithECONNRESETerrnoValue)
796 {
797     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
798         .Times(1)
799         .WillOnce(Invoke([](redisAsyncContext* ac, redisCallbackFn* cb, void* pd, int, const char**, const size_t*)
800                          {
801                              ac->err = REDIS_ERR_IO;
802                              errno = ECONNRESET;
803                              cb(ac, nullptr, pd);
804                              return REDIS_OK;
805                          }));
806     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
807         .Times(1);
808     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
809                                         this,
810                                         std::placeholders::_1,
811                                         std::placeholders::_2),
812                                         defaultNamespace,
813                                         contents);
814 }
815
816 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, EofErrorInContext)
817 {
818     expectContextError(REDIS_ERR_EOF);
819     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
820         .Times(1);
821     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
822                                         this,
823                                         std::placeholders::_1,
824                                         std::placeholders::_2),
825                                         defaultNamespace,
826                                         contents);
827 }
828
829 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, ProtocolErrorInContext)
830 {
831     expectContextError(REDIS_ERR_PROTOCOL);
832     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
833         .Times(1);
834     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
835                                         this,
836                                         std::placeholders::_1,
837                                         std::placeholders::_2),
838                                         defaultNamespace,
839                                         contents);
840 }
841
842 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, OomErrorInContext)
843 {
844     expectContextError(REDIS_ERR_OOM);
845     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
846         .Times(1);
847     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
848                                         this,
849                                         std::placeholders::_1,
850                                         std::placeholders::_2),
851                                         defaultNamespace,
852                                         contents);
853 }
854
855 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
856 {
857     expectContextError(REDIS_ERR_OTHER);
858     EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
859         .Times(1);
860     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
861                                         this,
862                                         std::placeholders::_1,
863                                         std::placeholders::_2),
864                                         defaultNamespace,
865                                         contents);
866 }
867
868 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
869 {
870     InSequence dummy;
871     expectRedisAsyncCommandArgv_SaveCb();
872     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
873                                         this,
874                                         std::placeholders::_1,
875                                         std::placeholders::_2),
876                                         defaultNamespace,
877                                         contents);
878     expectAck();
879     savedCb(&ac, &redisReplyBuilder.buildStringReply(), savedPd);
880     dispatcher->disableCommandCallbacks();
881     expectRedisAsyncCommandArgv_SaveCb();
882     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
883                                         this,
884                                         std::placeholders::_1,
885                                         std::placeholders::_2),
886                                         defaultNamespace,
887                                         contents);
888     expectAckNotCalled();
889     savedCb(&ac, &redisReplyBuilder.buildStringReply(), savedPd);
890 }
891
892 TEST_F(AsyncHiredisCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
893 {
894     InSequence dummy;
895     dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::disconnectCallback,
896                                              this));
897     expectDisconnectCallback();
898     expectArmConnectionRetryTimer();
899     disconnected(&ac, 0);
900     expectDisarmConnectionRetryTimer();
901     expectCommandListQuery();
902     connected(&ac, 0); // restore connection to meet destructor expectations
903 }
904
905 TEST_F(AsyncHiredisCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
906 {
907     InSequence dummy;
908     redisCallbackFn* savedCb;
909     void* savedPd;
910     Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
911     expectCommandListQuery();
912     connected(&ac, 0);
913     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
914         .Times(1)
915         .WillOnce(Invoke([&savedCb, &savedPd](redisAsyncContext*, redisCallbackFn* cb, void* pd,
916                                               int, const char**, const size_t*)
917                          {
918                              savedCb = cb;
919                              savedPd = pd;
920                              return REDIS_OK;
921                          }));
922     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
923                                         this,
924                                         std::placeholders::_1,
925                                         std::placeholders::_2),
926                                         defaultNamespace,
927                                         contents);
928     EXPECT_CALL(*this, ack(std::error_code(), _))
929             .Times(3);
930     redisReply rr;
931     rr.type = REDIS_REPLY_NIL;
932     savedCb(&ac, &rr, savedPd);
933     savedCb(&ac, &rr, savedPd);
934     savedCb(&ac, &rr, savedPd);
935 }
936
937 TEST_F(AsyncHiredisCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
938 {
939     redisCallbackFn* savedCb;
940     void* savedPd;
941     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
942         .Times(1)
943         .WillOnce(Invoke([this, &savedCb, &savedPd](redisAsyncContext* ac, redisCallbackFn* cb, void* pd,
944                                                     int, const char**, const size_t*)
945                          {
946                              savedCb = cb;
947                              savedPd = pd;
948                              cb(ac, &redisReplyBuilder.buildNilReply(), pd);
949                              return REDIS_OK;
950                          }));
951     expectAck();
952     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherDeathTest::ack,
953                                         this,
954                                         std::placeholders::_1,
955                                         std::placeholders::_2),
956                                         defaultNamespace,
957                                         contents);
958     EXPECT_EXIT(savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
959 }
960
961 TEST_F(AsyncHiredisCommandDispatcherDeathTest, TooManyRepliesAborts)
962 {
963     InSequence dummy;
964     redisCallbackFn* savedCb;
965     void* savedPd;
966     Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
967     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(&ac, _, _, _, _, _))
968         .Times(1)
969         .WillOnce(Invoke([&savedCb, &savedPd](redisAsyncContext*, redisCallbackFn* cb, void* pd,
970                                               int, const char**, const size_t*)
971                          {
972                              savedCb = cb;
973                              savedPd = pd;
974                              return REDIS_OK;
975                          }));
976     expectAck();
977     dispatcher->dispatchAsync(std::bind(&AsyncHiredisCommandDispatcherConnectedTest::ack,
978                                         this,
979                                         std::placeholders::_1,
980                                         std::placeholders::_2),
981                                         defaultNamespace,
982                                         contents);
983     savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd);
984     EXPECT_EXIT(savedCb(&ac, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
985 }
986
987 TEST_F(AsyncHiredisCommandDispatcherForSentinelTest, CommandListInquiryIsNotSent)
988 {
989     EXPECT_CALL(hiredisSystemMock, redisAsyncCommandArgv(_, _, _, _, _, _))
990         .Times(0);
991     connected(&ac, 0);
992 }