Add Redis Sentinel based database discovery
[ric-plt/sdl.git] / src / redis / asynchirediscommanddispatcher.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include "private/redis/asynchirediscommanddispatcher.hpp"
18 #include <algorithm>
19 #include <cstring>
20 #include <cerrno>
21 #include <sstream>
22 #include <arpa/inet.h>
23 #include "private/abort.hpp"
24 #include "private/createlogger.hpp"
25 #include "private/engine.hpp"
26 #include "private/error.hpp"
27 #include "private/logger.hpp"
28 #include "private/redis/asyncredisreply.hpp"
29 #include "private/redis/reply.hpp"
30 #include "private/redis/hiredissystem.hpp"
31 #include "private/redis/hiredisepolladapter.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/redisgeneral.hpp"
34
35 using namespace shareddatalayer;
36 using namespace shareddatalayer::redis;
37
38 namespace
39 {
40     void connectCb(const redisAsyncContext* ac, int status)
41     {
42         bool isConnected = !status;
43         auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
44
45         if (isConnected)
46         {
47             std::ostringstream msg;
48             msg << "redis connected, fd: " << ac->c.fd;
49             logInfoOnce(msg.str());
50             instance->verifyConnection();
51         }
52         else
53             instance->setDisconnected();
54
55     }
56
57     void disconnectCb(const redisAsyncContext* ac, int status)
58     {
59         if (status) {
60             std::ostringstream msg;
61             msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
62             logErrorOnce(msg.str());
63         }
64         auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
65         instance->setDisconnected();
66     }
67
68     void cb(redisAsyncContext* ac, void* rr, void* pd)
69     {
70         auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
71         auto reply(static_cast<redisReply*>(rr));
72         auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
73         if (instance->isClientCallbacksEnabled())
74             instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
75     }
76 }
77
78 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
79                                                              const std::string& address,
80                                                              uint16_t port,
81                                                              std::shared_ptr<ContentsBuilder> contentsBuilder,
82                                                              bool usePermanentCommandCallbacks,
83                                                              std::shared_ptr<Logger> logger,
84                                                              bool usedForSentinel):
85     AsyncHiredisCommandDispatcher(engine,
86                                   address,
87                                   port,
88                                   contentsBuilder,
89                                   usePermanentCommandCallbacks,
90                                   HiredisSystem::getHiredisSystem(),
91                                   std::make_shared<HiredisEpollAdapter>(engine),
92                                   logger,
93                                   usedForSentinel)
94 {
95 }
96
97 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
98                                                              const std::string& address,
99                                                              uint16_t port,
100                                                              std::shared_ptr<ContentsBuilder> contentsBuilder,
101                                                              bool usePermanentCommandCallbacks,
102                                                              HiredisSystem& hiredisSystem,
103                                                              std::shared_ptr<HiredisEpollAdapter> adapter,
104                                                              std::shared_ptr<Logger> logger,
105                                                              bool usedForSentinel):
106     engine(engine),
107     address(address),
108     port(ntohs(port)),
109     contentsBuilder(contentsBuilder),
110     usePermanentCommandCallbacks(usePermanentCommandCallbacks),
111     hiredisSystem(hiredisSystem),
112     adapter(adapter),
113     ac(nullptr),
114     serviceState(ServiceState::DISCONNECTED),
115     clientCallbacksEnabled(true),
116     connectionRetryTimer(engine),
117     connectionRetryTimerDuration(std::chrono::seconds(1)),
118     connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
119     logger(logger),
120     usedForSentinel(usedForSentinel)
121
122 {
123     connect();
124 }
125
126 AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
127 {
128     disconnectHiredis();
129 }
130
131 void AsyncHiredisCommandDispatcher::connect()
132 {
133     ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
134     if (ac == nullptr || ac->err)
135     {
136         setDisconnected();
137         return;
138     }
139     ac->data = this;
140     adapter->attach(ac);
141     hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
142     hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
143 }
144
145 void AsyncHiredisCommandDispatcher::verifyConnection()
146 {
147     if (usedForSentinel)
148         setConnected();
149     else
150     {
151        /* When Redis has max amount of users, it will still accept new connections but will
152         * close them immediately. Therefore, we need to verify that just established connection
153         * really works. This prevents calling client readyAck callback for a connection that
154         * will be terminated immediately.
155         */
156         /* Connection verification is now done by doing redis command list query. Because we anyway
157          * need to verify that Redis has required commands, we can now combine these two operations
158          * (command list query and connection verification). If either one of the functionalities
159          * is not needed in the future and it is removed, remember to still leave the other one.
160          */
161         serviceState = ServiceState::CONNECTION_VERIFICATION;
162         /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
163          * we are spontaneously connected to redis while timer is running. If connection verification
164          * fails, timer is armed again (normal handling in connection verification).
165          */
166         connectionRetryTimer.disarm();
167         dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
168                                 this,
169                                 std::placeholders::_1,
170                                 std::placeholders::_2),
171                       contentsBuilder->build("COMMAND"),
172                       false);
173     }
174 }
175
176 void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
177                                                           const redis::Reply& reply)
178 {
179     if(error)
180     {
181         logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
182                         << error.message();
183
184         if (!connectionRetryTimer.isArmed())
185         {
186             /* Typically if connection verification fails, hiredis will call disconnect callback and
187              * whole connection establishment procedure will be restarted via that. To ensure that
188              * we will retry verification even if connection would not be disconnected this timer
189              * is set. If connection is later disconnected, this timer is disarmed (when disconnect
190              * callback handling arms this timer again).
191              */
192             armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
193                                     std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
194         }
195     }
196     else
197     {
198         if (checkRedisModuleCommands(parseCommandListReply(reply)))
199             setConnected();
200         else
201             SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
202     }
203 }
204
205 void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
206 {
207     this->connectAck = connectAck;
208     if (serviceState == ServiceState::CONNECTED)
209         engine.postCallback(connectAck);
210 }
211
212 void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
213 {
214     disconnectCallback = disconnectCb;
215 }
216
217 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
218                                                   const AsyncConnection::Namespace&,
219                                                   const Contents& contents)
220 {
221     dispatchAsync(commandCb, contents, true);
222 }
223
224 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
225                                                   const Contents& contents,
226                                                   bool checkConnectionState)
227 {
228     if (checkConnectionState && serviceState != ServiceState::CONNECTED)
229     {
230         engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
231                                        this,
232                                        commandCb,
233                                        std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
234         return;
235     }
236     cbs.push_back(commandCb);
237     std::vector<const char*> chars;
238     std::transform(contents.stack.begin(), contents.stack.end(),
239                    std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
240     if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
241                                             &chars[0], &contents.sizes[0]) != REDIS_OK)
242     {
243         removeCb(cbs.back());
244         engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
245                                        this,
246                                        commandCb,
247                                        getRedisError(ac->err, ac->errstr, nullptr)));
248     }
249 }
250
251 void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
252 {
253     clientCallbacksEnabled = false;
254 }
255
256 void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
257                                                            const std::error_code& error)
258 {
259     commandCb(error, AsyncRedisReply());
260 }
261
262 void AsyncHiredisCommandDispatcher::setConnected()
263 {
264     serviceState = ServiceState::CONNECTED;
265
266     if (connectAck)
267     {
268         connectAck();
269         connectAck = ConnectAck();
270     }
271 }
272
273 void AsyncHiredisCommandDispatcher::setDisconnected()
274 {
275     serviceState = ServiceState::DISCONNECTED;
276
277     if (disconnectCallback)
278         disconnectCallback();
279
280     armConnectionRetryTimer(connectionRetryTimerDuration,
281                             std::bind(&AsyncHiredisCommandDispatcher::connect, this));
282 }
283
284 void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
285                                                 const std::error_code& error,
286                                                 const redisReply* rr)
287 {
288     if (!isValidCb(commandCb))
289         SHAREDDATALAYER_ABORT("Invalid callback function.");
290     if (error)
291         commandCb(error, AsyncRedisReply());
292     else
293         commandCb(error, AsyncRedisReply(*rr));
294     if (!usePermanentCommandCallbacks)
295         removeCb(commandCb);
296 }
297
298 bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
299 {
300     return clientCallbacksEnabled;
301 }
302
303 bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
304 {
305     for (auto i(cbs.begin()); i != cbs.end(); ++i)
306         if (&*i == &commandCb)
307             return true;
308     return false;
309 }
310
311 void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
312 {
313     for (auto i(cbs.begin()); i != cbs.end(); ++i)
314         if (&*i == &commandCb)
315         {
316             cbs.erase(i);
317             break;
318         }
319 }
320
321 void AsyncHiredisCommandDispatcher::disconnectHiredis()
322 {
323     /* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
324      * if ac is a valid pointer).
325      */
326     if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
327         hiredisSystem.redisAsyncFree(ac);
328
329     //disconnect callback handler will update serviceState
330 }
331
332 void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
333                                                             std::function<void()> retryAction)
334 {
335     connectionRetryTimer.arm(duration,
336                              [retryAction] () { retryAction(); });
337 }