Add first version
[ric-plt/sdl.git] / src / redis / asynchiredisclustercommanddispatcher.cpp
diff --git a/src/redis/asynchiredisclustercommanddispatcher.cpp b/src/redis/asynchiredisclustercommanddispatcher.cpp
new file mode 100644 (file)
index 0000000..7270886
--- /dev/null
@@ -0,0 +1,334 @@
+/*
+   Copyright (c) 2018-2019 Nokia.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
+#include <algorithm>
+#include <cstring>
+#include <cerrno>
+#include <sstream>
+#include "private/abort.hpp"
+#include "private/createlogger.hpp"
+#include "private/error.hpp"
+#include "private/logger.hpp"
+#include "private/redis/asyncredisreply.hpp"
+#include "private/redis/reply.hpp"
+#include "private/redis/hiredisclustersystem.hpp"
+#include "private/engine.hpp"
+#include "private/redis/hiredisclusterepolladapter.hpp"
+#include "private/redis/contents.hpp"
+#include "private/redis/redisgeneral.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+    void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
+    {
+        if (!status)
+        {
+            std::ostringstream msg;
+            msg << "redis cluster instance connected, fd: " << ac->c.fd;
+            logDebugOnce(msg.str());
+        }
+    }
+
+    void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
+    {
+        if (status)
+        {
+            std::ostringstream msg;
+            msg << "redis cluster instance disconnected, fd: " << ac->c.fd
+                << ", status: " << ac->err;
+            logDebugOnce(msg.str());
+        }
+        auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
+        instance->handleDisconnect(ac);
+    }
+
+    void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
+    {
+        auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
+        auto reply(static_cast<redisReply*>(rr));
+        auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
+        if (instance->isClientCallbacksEnabled())
+            instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
+    }
+}
+
+AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
+                                                                           const boost::optional<std::string>& ns,
+                                                                           const DatabaseConfiguration::Addresses& addresses,
+                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
+                                                                           bool usePermanentCommandCallbacks,
+                                                                           std::shared_ptr<Logger> logger):
+    AsyncHiredisClusterCommandDispatcher(engine,
+                                         ns,
+                                         addresses,
+                                         contentsBuilder,
+                                         usePermanentCommandCallbacks,
+                                         HiredisClusterSystem::getInstance(),
+                                         std::make_shared<HiredisClusterEpollAdapter>(engine),
+                                         logger)
+{
+}
+
+AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
+                                                                           const boost::optional<std::string>& ns,
+                                                                           const DatabaseConfiguration::Addresses& addresses,
+                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
+                                                                           bool usePermanentCommandCallbacks,
+                                                                           HiredisClusterSystem& hiredisClusterSystem,
+                                                                           std::shared_ptr<HiredisClusterEpollAdapter> adapter,
+                                                                           std::shared_ptr<Logger> logger):
+    engine(engine),
+    initialNamespace(ns),
+    addresses(addresses),
+    contentsBuilder(contentsBuilder),
+    usePermanentCommandCallbacks(usePermanentCommandCallbacks),
+    hiredisClusterSystem(hiredisClusterSystem),
+    adapter(adapter),
+    acc(nullptr),
+    serviceState(ServiceState::DISCONNECTED),
+    clientCallbacksEnabled(true),
+    connectionRetryTimer(engine),
+    connectionRetryTimerDuration(std::chrono::seconds(1)),
+    logger(logger)
+{
+    connect();
+}
+
+AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
+{
+    disconnectHiredisCluster();
+}
+
+void AsyncHiredisClusterCommandDispatcher::connect()
+{
+    // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
+    disconnectHiredisCluster();
+    acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
+                                                        HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
+    if (acc == nullptr)
+    {
+        logger->error() << "SDL: connecting to redis cluster failed, null context returned";
+        armConnectionRetryTimer();
+        return;
+    }
+    if (acc->err)
+    {
+        logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
+        armConnectionRetryTimer();
+        return;
+    }
+    acc->data = this;
+    adapter->setup(acc);
+    hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
+    hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
+    verifyConnection();
+}
+
+void AsyncHiredisClusterCommandDispatcher::verifyConnection()
+{
+    /* redisClusterAsyncConnect only queries available cluster nodes but it does
+     * not connect to any cluster node (as it does not know to which node it should connect to).
+     * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
+     * determined to which cluster node this instance will connect to. We do initial operation
+     * to get connection to right redis node established already now. This also verifies that
+     * connection really works. When Redis has max amount of users, it will still accept new
+     * connections but is will close them immediately. Therefore, we need to verify that just
+     * established connection really works.
+     */
+    /* Connection setup/verification is now done by doing redis command list query. Because we anyway
+     * need to verify that Redis has required commands, we can now combine these two operations
+     * (command list query and connection setup/verification). If either one of the functionalities
+     * is not needed in the future and it is removed, remember to still leave the other one.
+     */
+    /* Non namespace-specific command list query can be used for connection setup purposes,
+     * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
+     * commands dispacthed.
+     */
+
+    /* If initial namespace was not given during dispatcher creation (multi namespace API),
+     * verification is sent to hardcoded namespace. This works for verification purposes
+     * because in our environment cluster is configured to operate only if all nodes
+     * are working (so we can do verification to any node). However, this is not optimal
+     * because we do not necessarily connect to such cluster node which will later be
+     * used by client. Also our cluster configuration can change. This needs to be
+     * optimized later (perhaps to connect to all nodes). */
+    std::string nsForVerification;
+    if (initialNamespace)
+        nsForVerification = *initialNamespace;
+    else
+        nsForVerification = "namespace";
+
+    dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
+                            this,
+                            std::placeholders::_1,
+                            std::placeholders::_2),
+                  nsForVerification,
+                  contentsBuilder->build("COMMAND"),
+                  false);
+}
+
+void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
+{
+    if(error)
+    {
+        logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
+                        << error.message();
+        armConnectionRetryTimer();
+    }
+    else
+    {
+        if (checkRedisModuleCommands(parseCommandListReply(reply)))
+            setConnected();
+        else
+            SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
+    }
+}
+
+void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
+{
+    this->connectAck = connectAck;
+    if (serviceState == ServiceState::CONNECTED)
+        engine.postCallback(connectAck);
+}
+
+void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
+{
+    disconnectCallback = disconnectCb;
+}
+
+void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
+                                                         const AsyncConnection::Namespace& ns,
+                                                         const Contents& contents)
+{
+    dispatchAsync(commandCb, ns, contents, true);
+}
+
+void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
+                                                         const AsyncConnection::Namespace& ns,
+                                                         const Contents& contents,
+                                                         bool checkConnectionState)
+{
+    if (checkConnectionState && serviceState != ServiceState::CONNECTED)
+    {
+        engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
+                                       this,
+                                       commandCb,
+                                       std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
+        return;
+    }
+    cbs.push_back(commandCb);
+    std::vector<const char*> chars;
+    std::transform(contents.stack.begin(), contents.stack.end(),
+                   std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
+    if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
+                                                                 static_cast<int>(contents.stack.size()), &chars[0],
+                                                                 &contents.sizes[0]) != REDIS_OK)
+    {
+        removeCb(cbs.back());
+        engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
+                                       this,
+                                       commandCb,
+                                       getRedisError(acc->err, acc->errstr, nullptr)));
+    }
+}
+
+void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
+{
+    clientCallbacksEnabled = false;
+}
+
+void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
+{
+    commandCb(error, AsyncRedisReply());
+}
+
+void AsyncHiredisClusterCommandDispatcher::setConnected()
+{
+    serviceState = ServiceState::CONNECTED;
+
+    if (connectAck)
+    {
+        connectAck();
+        connectAck = ConnectAck();
+    }
+}
+
+void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
+{
+    connectionRetryTimer.arm(connectionRetryTimerDuration,
+                             [this] () { connect(); });
+
+}
+
+void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
+                                                       const std::error_code& error,
+                                                       const redisReply* rr)
+{
+    if (!isValidCb(commandCb))
+        SHAREDDATALAYER_ABORT("Invalid callback function.");
+    if (error)
+        commandCb(error, AsyncRedisReply());
+    else
+        commandCb(error, AsyncRedisReply(*rr));
+    if (!usePermanentCommandCallbacks)
+        removeCb(commandCb);
+}
+
+bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
+{
+    return clientCallbacksEnabled;
+}
+
+bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
+{
+    for (auto i(cbs.begin()); i != cbs.end(); ++i)
+        if (&*i == &commandCb)
+            return true;
+    return false;
+}
+
+void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
+{
+    for (auto i(cbs.begin()); i != cbs.end(); ++i)
+        if (&*i == &commandCb)
+        {
+            cbs.erase(i);
+            break;
+        }
+}
+
+void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
+{
+    adapter->detach(ac);
+
+    if (disconnectCallback)
+        disconnectCallback();
+}
+
+void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
+{
+    /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
+     * if acc is a valid pointer).
+     */
+    if (serviceState == ServiceState::CONNECTED)
+        hiredisClusterSystem.redisClusterAsyncFree(acc);
+
+    serviceState = ServiceState::DISCONNECTED;
+}