2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #include "private/redis/asynchiredisclustercommanddispatcher.hpp"
22 #include "private/abort.hpp"
23 #include "private/createlogger.hpp"
24 #include "private/error.hpp"
25 #include "private/logger.hpp"
26 #include "private/redis/asyncredisreply.hpp"
27 #include "private/redis/reply.hpp"
28 #include "private/redis/hiredisclustersystem.hpp"
29 #include "private/engine.hpp"
30 #include "private/redis/hiredisclusterepolladapter.hpp"
31 #include "private/redis/contents.hpp"
32 #include "private/redis/redisgeneral.hpp"
34 using namespace shareddatalayer;
35 using namespace shareddatalayer::redis;
39 void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
43 std::ostringstream msg;
44 msg << "redis cluster instance connected, fd: " << ac->c.fd;
45 logDebugOnce(msg.str());
49 void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
53 std::ostringstream msg;
54 msg << "redis cluster instance disconnected, fd: " << ac->c.fd
55 << ", status: " << ac->err;
56 logDebugOnce(msg.str());
58 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
59 instance->handleDisconnect(ac);
62 void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
64 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
65 auto reply(static_cast<redisReply*>(rr));
66 auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
67 if (instance->isClientCallbacksEnabled())
68 instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
72 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
73 const boost::optional<std::string>& ns,
74 const DatabaseConfiguration::Addresses& addresses,
75 std::shared_ptr<ContentsBuilder> contentsBuilder,
76 bool usePermanentCommandCallbacks,
77 std::shared_ptr<Logger> logger):
78 AsyncHiredisClusterCommandDispatcher(engine,
82 usePermanentCommandCallbacks,
83 HiredisClusterSystem::getInstance(),
84 std::make_shared<HiredisClusterEpollAdapter>(engine),
89 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
90 const boost::optional<std::string>& ns,
91 const DatabaseConfiguration::Addresses& addresses,
92 std::shared_ptr<ContentsBuilder> contentsBuilder,
93 bool usePermanentCommandCallbacks,
94 HiredisClusterSystem& hiredisClusterSystem,
95 std::shared_ptr<HiredisClusterEpollAdapter> adapter,
96 std::shared_ptr<Logger> logger):
100 contentsBuilder(contentsBuilder),
101 usePermanentCommandCallbacks(usePermanentCommandCallbacks),
102 hiredisClusterSystem(hiredisClusterSystem),
105 serviceState(ServiceState::DISCONNECTED),
106 clientCallbacksEnabled(true),
107 connectionRetryTimer(engine),
108 connectionRetryTimerDuration(std::chrono::seconds(1)),
114 AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
116 disconnectHiredisCluster();
119 void AsyncHiredisClusterCommandDispatcher::connect()
121 // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
122 disconnectHiredisCluster();
123 acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
124 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
127 logger->error() << "SDL: connecting to redis cluster failed, null context returned";
128 armConnectionRetryTimer();
133 logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
134 armConnectionRetryTimer();
139 hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
140 hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
144 void AsyncHiredisClusterCommandDispatcher::verifyConnection()
146 /* redisClusterAsyncConnect only queries available cluster nodes but it does
147 * not connect to any cluster node (as it does not know to which node it should connect to).
148 * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
149 * determined to which cluster node this instance will connect to. We do initial operation
150 * to get connection to right redis node established already now. This also verifies that
151 * connection really works. When Redis has max amount of users, it will still accept new
152 * connections but is will close them immediately. Therefore, we need to verify that just
153 * established connection really works.
155 /* Connection setup/verification is now done by doing redis command list query. Because we anyway
156 * need to verify that Redis has required commands, we can now combine these two operations
157 * (command list query and connection setup/verification). If either one of the functionalities
158 * is not needed in the future and it is removed, remember to still leave the other one.
160 /* Non namespace-specific command list query can be used for connection setup purposes,
161 * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
162 * commands dispacthed.
165 /* If initial namespace was not given during dispatcher creation (multi namespace API),
166 * verification is sent to hardcoded namespace. This works for verification purposes
167 * because in our environment cluster is configured to operate only if all nodes
168 * are working (so we can do verification to any node). However, this is not optimal
169 * because we do not necessarily connect to such cluster node which will later be
170 * used by client. Also our cluster configuration can change. This needs to be
171 * optimized later (perhaps to connect to all nodes). */
172 std::string nsForVerification;
173 if (initialNamespace)
174 nsForVerification = *initialNamespace;
176 nsForVerification = "namespace";
178 dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
180 std::placeholders::_1,
181 std::placeholders::_2),
183 contentsBuilder->build("COMMAND"),
187 void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
191 logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
193 armConnectionRetryTimer();
197 if (checkRedisModuleCommands(parseCommandListReply(reply)))
200 SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
204 void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
206 this->connectAck = connectAck;
207 if (serviceState == ServiceState::CONNECTED)
208 engine.postCallback(connectAck);
211 void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
213 disconnectCallback = disconnectCb;
216 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
217 const AsyncConnection::Namespace& ns,
218 const Contents& contents)
220 dispatchAsync(commandCb, ns, contents, true);
223 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
224 const AsyncConnection::Namespace& ns,
225 const Contents& contents,
226 bool checkConnectionState)
228 if (checkConnectionState && serviceState != ServiceState::CONNECTED)
230 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
233 std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
236 cbs.push_back(commandCb);
237 std::vector<const char*> chars;
238 std::transform(contents.stack.begin(), contents.stack.end(),
239 std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
240 if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
241 static_cast<int>(contents.stack.size()), &chars[0],
242 &contents.sizes[0]) != REDIS_OK)
244 removeCb(cbs.back());
245 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
248 getRedisError(acc->err, acc->errstr, nullptr)));
252 void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
254 clientCallbacksEnabled = false;
257 void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
259 commandCb(error, AsyncRedisReply());
262 void AsyncHiredisClusterCommandDispatcher::setConnected()
264 serviceState = ServiceState::CONNECTED;
269 connectAck = ConnectAck();
273 void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
275 connectionRetryTimer.arm(connectionRetryTimerDuration,
276 [this] () { connect(); });
280 void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
281 const std::error_code& error,
282 const redisReply* rr)
284 if (!isValidCb(commandCb))
285 SHAREDDATALAYER_ABORT("Invalid callback function.");
287 commandCb(error, AsyncRedisReply());
289 commandCb(error, AsyncRedisReply(*rr));
290 if (!usePermanentCommandCallbacks)
294 bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
296 return clientCallbacksEnabled;
299 bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
301 for (auto i(cbs.begin()); i != cbs.end(); ++i)
302 if (&*i == &commandCb)
307 void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
309 for (auto i(cbs.begin()); i != cbs.end(); ++i)
310 if (&*i == &commandCb)
317 void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
321 if (disconnectCallback)
322 disconnectCallback();
325 void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
327 /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
328 * if acc is a valid pointer).
330 if (serviceState == ServiceState::CONNECTED)
331 hiredisClusterSystem.redisClusterAsyncFree(acc);
333 serviceState = ServiceState::DISCONNECTED;