2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #include "private/redis/asynchirediscommanddispatcher.hpp"
22 #include <arpa/inet.h>
23 #include "private/abort.hpp"
24 #include "private/createlogger.hpp"
25 #include "private/engine.hpp"
26 #include "private/error.hpp"
27 #include "private/logger.hpp"
28 #include "private/redis/asyncredisreply.hpp"
29 #include "private/redis/reply.hpp"
30 #include "private/redis/hiredissystem.hpp"
31 #include "private/redis/hiredisepolladapter.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/redisgeneral.hpp"
35 using namespace shareddatalayer;
36 using namespace shareddatalayer::redis;
40 void connectCb(const redisAsyncContext* ac, int status)
42 bool isConnected = !status;
43 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
47 std::ostringstream msg;
48 msg << "redis connected, fd: " << ac->c.fd;
49 logInfoOnce(msg.str());
50 instance->verifyConnection();
53 instance->setDisconnected();
57 void disconnectCb(const redisAsyncContext* ac, int status)
60 std::ostringstream msg;
61 msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
62 logErrorOnce(msg.str());
64 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
65 instance->setDisconnected();
68 void cb(redisAsyncContext* ac, void* rr, void* pd)
70 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
71 auto reply(static_cast<redisReply*>(rr));
72 auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
73 if (instance->isClientCallbacksEnabled())
74 instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
78 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
79 const std::string& address,
81 std::shared_ptr<ContentsBuilder> contentsBuilder,
82 bool usePermanentCommandCallbacks,
83 std::shared_ptr<Logger> logger,
84 bool usedForSentinel):
85 AsyncHiredisCommandDispatcher(engine,
89 usePermanentCommandCallbacks,
90 HiredisSystem::getHiredisSystem(),
91 std::make_shared<HiredisEpollAdapter>(engine),
97 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
98 const std::string& address,
100 std::shared_ptr<ContentsBuilder> contentsBuilder,
101 bool usePermanentCommandCallbacks,
102 HiredisSystem& hiredisSystem,
103 std::shared_ptr<HiredisEpollAdapter> adapter,
104 std::shared_ptr<Logger> logger,
105 bool usedForSentinel):
109 contentsBuilder(contentsBuilder),
110 usePermanentCommandCallbacks(usePermanentCommandCallbacks),
111 hiredisSystem(hiredisSystem),
114 serviceState(ServiceState::DISCONNECTED),
115 clientCallbacksEnabled(true),
116 connectionRetryTimer(engine),
117 connectionRetryTimerDuration(std::chrono::seconds(1)),
118 connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
120 usedForSentinel(usedForSentinel)
126 AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
131 void AsyncHiredisCommandDispatcher::connect()
133 ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
134 if (ac == nullptr || ac->err)
141 hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
142 hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
145 void AsyncHiredisCommandDispatcher::verifyConnection()
151 /* When Redis has max amount of users, it will still accept new connections but will
152 * close them immediately. Therefore, we need to verify that just established connection
153 * really works. This prevents calling client readyAck callback for a connection that
154 * will be terminated immediately.
156 /* Connection verification is now done by doing redis command list query. Because we anyway
157 * need to verify that Redis has required commands, we can now combine these two operations
158 * (command list query and connection verification). If either one of the functionalities
159 * is not needed in the future and it is removed, remember to still leave the other one.
161 serviceState = ServiceState::CONNECTION_VERIFICATION;
162 /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
163 * we are spontaneously connected to redis while timer is running. If connection verification
164 * fails, timer is armed again (normal handling in connection verification).
166 connectionRetryTimer.disarm();
167 dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
169 std::placeholders::_1,
170 std::placeholders::_2),
171 contentsBuilder->build("COMMAND"),
176 void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
177 const redis::Reply& reply)
181 logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
184 if (!connectionRetryTimer.isArmed())
186 /* Typically if connection verification fails, hiredis will call disconnect callback and
187 * whole connection establishment procedure will be restarted via that. To ensure that
188 * we will retry verification even if connection would not be disconnected this timer
189 * is set. If connection is later disconnected, this timer is disarmed (when disconnect
190 * callback handling arms this timer again).
192 armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
193 std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
198 if (checkRedisModuleCommands(parseCommandListReply(reply)))
201 SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
205 void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
207 this->connectAck = connectAck;
208 if (serviceState == ServiceState::CONNECTED)
209 engine.postCallback(connectAck);
212 void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
214 disconnectCallback = disconnectCb;
217 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
218 const AsyncConnection::Namespace&,
219 const Contents& contents)
221 dispatchAsync(commandCb, contents, true);
224 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
225 const Contents& contents,
226 bool checkConnectionState)
228 if (checkConnectionState && serviceState != ServiceState::CONNECTED)
230 engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
233 std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
236 cbs.push_back(commandCb);
237 std::vector<const char*> chars;
238 std::transform(contents.stack.begin(), contents.stack.end(),
239 std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
240 if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
241 &chars[0], &contents.sizes[0]) != REDIS_OK)
243 removeCb(cbs.back());
244 engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
247 getRedisError(ac->err, ac->errstr, nullptr)));
251 void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
253 clientCallbacksEnabled = false;
256 void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
257 const std::error_code& error)
259 commandCb(error, AsyncRedisReply());
262 void AsyncHiredisCommandDispatcher::setConnected()
264 serviceState = ServiceState::CONNECTED;
269 connectAck = ConnectAck();
273 void AsyncHiredisCommandDispatcher::setDisconnected()
275 serviceState = ServiceState::DISCONNECTED;
277 if (disconnectCallback)
278 disconnectCallback();
280 armConnectionRetryTimer(connectionRetryTimerDuration,
281 std::bind(&AsyncHiredisCommandDispatcher::connect, this));
284 void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
285 const std::error_code& error,
286 const redisReply* rr)
288 if (!isValidCb(commandCb))
289 SHAREDDATALAYER_ABORT("Invalid callback function.");
291 commandCb(error, AsyncRedisReply());
293 commandCb(error, AsyncRedisReply(*rr));
294 if (!usePermanentCommandCallbacks)
298 bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
300 return clientCallbacksEnabled;
303 bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
305 for (auto i(cbs.begin()); i != cbs.end(); ++i)
306 if (&*i == &commandCb)
311 void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
313 for (auto i(cbs.begin()); i != cbs.end(); ++i)
314 if (&*i == &commandCb)
321 void AsyncHiredisCommandDispatcher::disconnectHiredis()
323 /* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
324 * if ac is a valid pointer).
326 if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
327 hiredisSystem.redisAsyncFree(ac);
329 //disconnect callback handler will update serviceState
332 void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
333 std::function<void()> retryAction)
335 connectionRetryTimer.arm(duration,
336 [retryAction] () { retryAction(); });