2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #include "private/redis/asynchirediscommanddispatcher.hpp"
22 #include <arpa/inet.h>
23 #include "private/abort.hpp"
24 #include "private/createlogger.hpp"
25 #include "private/engine.hpp"
26 #include "private/error.hpp"
27 #include "private/logger.hpp"
28 #include "private/redis/asyncredisreply.hpp"
29 #include "private/redis/reply.hpp"
30 #include "private/redis/hiredissystem.hpp"
31 #include "private/redis/hiredisepolladapter.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/redisgeneral.hpp"
35 using namespace shareddatalayer;
36 using namespace shareddatalayer::redis;
40 void connectCb(const redisAsyncContext* ac, int status)
42 bool isConnected = !status;
43 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
47 std::ostringstream msg;
48 msg << "redis connected, fd: " << ac->c.fd;
49 logInfoOnce(msg.str());
50 instance->verifyConnection();
53 instance->setDisconnected();
57 void disconnectCb(const redisAsyncContext* ac, int status)
60 std::ostringstream msg;
61 msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
62 logErrorOnce(msg.str());
64 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
65 instance->setDisconnected();
68 void cb(redisAsyncContext* ac, void* rr, void* pd)
70 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
71 auto reply(static_cast<redisReply*>(rr));
72 auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
73 if (instance->isClientCallbacksEnabled())
74 instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
78 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
79 const std::string& address,
81 std::shared_ptr<ContentsBuilder> contentsBuilder,
82 bool usePermanentCommandCallbacks,
83 std::shared_ptr<Logger> logger):
84 AsyncHiredisCommandDispatcher(engine,
88 usePermanentCommandCallbacks,
89 HiredisSystem::getHiredisSystem(),
90 std::make_shared<HiredisEpollAdapter>(engine),
95 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
96 const std::string& address,
98 std::shared_ptr<ContentsBuilder> contentsBuilder,
99 bool usePermanentCommandCallbacks,
100 HiredisSystem& hiredisSystem,
101 std::shared_ptr<HiredisEpollAdapter> adapter,
102 std::shared_ptr<Logger> logger):
106 contentsBuilder(contentsBuilder),
107 usePermanentCommandCallbacks(usePermanentCommandCallbacks),
108 hiredisSystem(hiredisSystem),
111 serviceState(ServiceState::DISCONNECTED),
112 clientCallbacksEnabled(true),
113 connectionRetryTimer(engine),
114 connectionRetryTimerDuration(std::chrono::seconds(1)),
115 connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
122 AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
127 void AsyncHiredisCommandDispatcher::connect()
129 ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
130 if (ac == nullptr || ac->err)
137 hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
138 hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
141 void AsyncHiredisCommandDispatcher::verifyConnection()
143 /* When Redis has max amount of users, it will still accept new connections but will
144 * close them immediately. Therefore, we need to verify that just established connection
145 * really works. This prevents calling client readyAck callback for a connection that
146 * will be terminated immediately.
148 /* Connection verification is now done by doing redis command list query. Because we anyway
149 * need to verify that Redis has required commands, we can now combine these two operations
150 * (command list query and connection verification). If either one of the functionalities
151 * is not needed in the future and it is removed, remember to still leave the other one.
153 serviceState = ServiceState::CONNECTION_VERIFICATION;
154 /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
155 * we are spontaneously connected to redis while timer is running. If connection verification
156 * fails, timer is armed again (normal handling in connection verification).
158 connectionRetryTimer.disarm();
159 dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
161 std::placeholders::_1,
162 std::placeholders::_2),
163 contentsBuilder->build("COMMAND"),
167 void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
168 const redis::Reply& reply)
172 logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
175 if (!connectionRetryTimer.isArmed())
177 /* Typically if connection verification fails, hiredis will call disconnect callback and
178 * whole connection establishment procedure will be restarted via that. To ensure that
179 * we will retry verification even if connection would not be disconnected this timer
180 * is set. If connection is later disconnected, this timer is disarmed (when disconnect
181 * callback handling arms this timer again).
183 armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
184 std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
189 if (checkRedisModuleCommands(parseCommandListReply(reply)))
192 SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
196 void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
198 this->connectAck = connectAck;
199 if (serviceState == ServiceState::CONNECTED)
200 engine.postCallback(connectAck);
203 void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
205 disconnectCallback = disconnectCb;
208 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
209 const AsyncConnection::Namespace&,
210 const Contents& contents)
212 dispatchAsync(commandCb, contents, true);
215 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
216 const Contents& contents,
217 bool checkConnectionState)
219 if (checkConnectionState && serviceState != ServiceState::CONNECTED)
221 engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
224 std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
227 cbs.push_back(commandCb);
228 std::vector<const char*> chars;
229 std::transform(contents.stack.begin(), contents.stack.end(),
230 std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
231 if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
232 &chars[0], &contents.sizes[0]) != REDIS_OK)
234 removeCb(cbs.back());
235 engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
238 getRedisError(ac->err, ac->errstr, nullptr)));
242 void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
244 clientCallbacksEnabled = false;
247 void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
248 const std::error_code& error)
250 commandCb(error, AsyncRedisReply());
253 void AsyncHiredisCommandDispatcher::setConnected()
255 serviceState = ServiceState::CONNECTED;
260 connectAck = ConnectAck();
264 void AsyncHiredisCommandDispatcher::setDisconnected()
266 serviceState = ServiceState::DISCONNECTED;
268 if (disconnectCallback)
269 disconnectCallback();
271 armConnectionRetryTimer(connectionRetryTimerDuration,
272 std::bind(&AsyncHiredisCommandDispatcher::connect, this));
275 void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
276 const std::error_code& error,
277 const redisReply* rr)
279 if (!isValidCb(commandCb))
280 SHAREDDATALAYER_ABORT("Invalid callback function.");
282 commandCb(error, AsyncRedisReply());
284 commandCb(error, AsyncRedisReply(*rr));
285 if (!usePermanentCommandCallbacks)
289 bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
291 return clientCallbacksEnabled;
294 bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
296 for (auto i(cbs.begin()); i != cbs.end(); ++i)
297 if (&*i == &commandCb)
302 void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
304 for (auto i(cbs.begin()); i != cbs.end(); ++i)
305 if (&*i == &commandCb)
312 void AsyncHiredisCommandDispatcher::disconnectHiredis()
314 /* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
315 * if ac is a valid pointer).
317 if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
318 hiredisSystem.redisAsyncFree(ac);
320 //disconnect callback handler will update serviceState
323 void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
324 std::function<void()> retryAction)
326 connectionRetryTimer.arm(duration,
327 [retryAction] () { retryAction(); });