--- /dev/null
+/*
+ Copyright (c) 2018-2019 Nokia.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
+#include <algorithm>
+#include <cstring>
+#include <cerrno>
+#include <sstream>
+#include "private/abort.hpp"
+#include "private/createlogger.hpp"
+#include "private/error.hpp"
+#include "private/logger.hpp"
+#include "private/redis/asyncredisreply.hpp"
+#include "private/redis/reply.hpp"
+#include "private/redis/hiredisclustersystem.hpp"
+#include "private/engine.hpp"
+#include "private/redis/hiredisclusterepolladapter.hpp"
+#include "private/redis/contents.hpp"
+#include "private/redis/redisgeneral.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+ void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
+ {
+ if (!status)
+ {
+ std::ostringstream msg;
+ msg << "redis cluster instance connected, fd: " << ac->c.fd;
+ logDebugOnce(msg.str());
+ }
+ }
+
+ void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
+ {
+ if (status)
+ {
+ std::ostringstream msg;
+ msg << "redis cluster instance disconnected, fd: " << ac->c.fd
+ << ", status: " << ac->err;
+ logDebugOnce(msg.str());
+ }
+ auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
+ instance->handleDisconnect(ac);
+ }
+
+ void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
+ {
+ auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
+ auto reply(static_cast<redisReply*>(rr));
+ auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
+ if (instance->isClientCallbacksEnabled())
+ instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
+ }
+}
+
+AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
+ const boost::optional<std::string>& ns,
+ const DatabaseConfiguration::Addresses& addresses,
+ std::shared_ptr<ContentsBuilder> contentsBuilder,
+ bool usePermanentCommandCallbacks,
+ std::shared_ptr<Logger> logger):
+ AsyncHiredisClusterCommandDispatcher(engine,
+ ns,
+ addresses,
+ contentsBuilder,
+ usePermanentCommandCallbacks,
+ HiredisClusterSystem::getInstance(),
+ std::make_shared<HiredisClusterEpollAdapter>(engine),
+ logger)
+{
+}
+
+AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
+ const boost::optional<std::string>& ns,
+ const DatabaseConfiguration::Addresses& addresses,
+ std::shared_ptr<ContentsBuilder> contentsBuilder,
+ bool usePermanentCommandCallbacks,
+ HiredisClusterSystem& hiredisClusterSystem,
+ std::shared_ptr<HiredisClusterEpollAdapter> adapter,
+ std::shared_ptr<Logger> logger):
+ engine(engine),
+ initialNamespace(ns),
+ addresses(addresses),
+ contentsBuilder(contentsBuilder),
+ usePermanentCommandCallbacks(usePermanentCommandCallbacks),
+ hiredisClusterSystem(hiredisClusterSystem),
+ adapter(adapter),
+ acc(nullptr),
+ serviceState(ServiceState::DISCONNECTED),
+ clientCallbacksEnabled(true),
+ connectionRetryTimer(engine),
+ connectionRetryTimerDuration(std::chrono::seconds(1)),
+ logger(logger)
+{
+ connect();
+}
+
+AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
+{
+ disconnectHiredisCluster();
+}
+
+void AsyncHiredisClusterCommandDispatcher::connect()
+{
+ // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
+ disconnectHiredisCluster();
+ acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
+ HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
+ if (acc == nullptr)
+ {
+ logger->error() << "SDL: connecting to redis cluster failed, null context returned";
+ armConnectionRetryTimer();
+ return;
+ }
+ if (acc->err)
+ {
+ logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
+ armConnectionRetryTimer();
+ return;
+ }
+ acc->data = this;
+ adapter->setup(acc);
+ hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
+ hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
+ verifyConnection();
+}
+
+void AsyncHiredisClusterCommandDispatcher::verifyConnection()
+{
+ /* redisClusterAsyncConnect only queries available cluster nodes but it does
+ * not connect to any cluster node (as it does not know to which node it should connect to).
+ * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
+ * determined to which cluster node this instance will connect to. We do initial operation
+ * to get connection to right redis node established already now. This also verifies that
+ * connection really works. When Redis has max amount of users, it will still accept new
+ * connections but is will close them immediately. Therefore, we need to verify that just
+ * established connection really works.
+ */
+ /* Connection setup/verification is now done by doing redis command list query. Because we anyway
+ * need to verify that Redis has required commands, we can now combine these two operations
+ * (command list query and connection setup/verification). If either one of the functionalities
+ * is not needed in the future and it is removed, remember to still leave the other one.
+ */
+ /* Non namespace-specific command list query can be used for connection setup purposes,
+ * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
+ * commands dispacthed.
+ */
+
+ /* If initial namespace was not given during dispatcher creation (multi namespace API),
+ * verification is sent to hardcoded namespace. This works for verification purposes
+ * because in our environment cluster is configured to operate only if all nodes
+ * are working (so we can do verification to any node). However, this is not optimal
+ * because we do not necessarily connect to such cluster node which will later be
+ * used by client. Also our cluster configuration can change. This needs to be
+ * optimized later (perhaps to connect to all nodes). */
+ std::string nsForVerification;
+ if (initialNamespace)
+ nsForVerification = *initialNamespace;
+ else
+ nsForVerification = "namespace";
+
+ dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ nsForVerification,
+ contentsBuilder->build("COMMAND"),
+ false);
+}
+
+void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
+{
+ if(error)
+ {
+ logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
+ << error.message();
+ armConnectionRetryTimer();
+ }
+ else
+ {
+ if (checkRedisModuleCommands(parseCommandListReply(reply)))
+ setConnected();
+ else
+ SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
+ }
+}
+
+void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
+{
+ this->connectAck = connectAck;
+ if (serviceState == ServiceState::CONNECTED)
+ engine.postCallback(connectAck);
+}
+
+void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
+{
+ disconnectCallback = disconnectCb;
+}
+
+void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
+ const AsyncConnection::Namespace& ns,
+ const Contents& contents)
+{
+ dispatchAsync(commandCb, ns, contents, true);
+}
+
+void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
+ const AsyncConnection::Namespace& ns,
+ const Contents& contents,
+ bool checkConnectionState)
+{
+ if (checkConnectionState && serviceState != ServiceState::CONNECTED)
+ {
+ engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
+ this,
+ commandCb,
+ std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
+ return;
+ }
+ cbs.push_back(commandCb);
+ std::vector<const char*> chars;
+ std::transform(contents.stack.begin(), contents.stack.end(),
+ std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
+ if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
+ static_cast<int>(contents.stack.size()), &chars[0],
+ &contents.sizes[0]) != REDIS_OK)
+ {
+ removeCb(cbs.back());
+ engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
+ this,
+ commandCb,
+ getRedisError(acc->err, acc->errstr, nullptr)));
+ }
+}
+
+void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
+{
+ clientCallbacksEnabled = false;
+}
+
+void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
+{
+ commandCb(error, AsyncRedisReply());
+}
+
+void AsyncHiredisClusterCommandDispatcher::setConnected()
+{
+ serviceState = ServiceState::CONNECTED;
+
+ if (connectAck)
+ {
+ connectAck();
+ connectAck = ConnectAck();
+ }
+}
+
+void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
+{
+ connectionRetryTimer.arm(connectionRetryTimerDuration,
+ [this] () { connect(); });
+
+}
+
+void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
+ const std::error_code& error,
+ const redisReply* rr)
+{
+ if (!isValidCb(commandCb))
+ SHAREDDATALAYER_ABORT("Invalid callback function.");
+ if (error)
+ commandCb(error, AsyncRedisReply());
+ else
+ commandCb(error, AsyncRedisReply(*rr));
+ if (!usePermanentCommandCallbacks)
+ removeCb(commandCb);
+}
+
+bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
+{
+ return clientCallbacksEnabled;
+}
+
+bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
+{
+ for (auto i(cbs.begin()); i != cbs.end(); ++i)
+ if (&*i == &commandCb)
+ return true;
+ return false;
+}
+
+void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
+{
+ for (auto i(cbs.begin()); i != cbs.end(); ++i)
+ if (&*i == &commandCb)
+ {
+ cbs.erase(i);
+ break;
+ }
+}
+
+void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
+{
+ adapter->detach(ac);
+
+ if (disconnectCallback)
+ disconnectCallback();
+}
+
+void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
+{
+ /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
+ * if acc is a valid pointer).
+ */
+ if (serviceState == ServiceState::CONNECTED)
+ hiredisClusterSystem.redisClusterAsyncFree(acc);
+
+ serviceState = ServiceState::DISCONNECTED;
+}