Add first version
[ric-plt/sdl.git] / src / redis / asynchirediscommanddispatcher.cpp
diff --git a/src/redis/asynchirediscommanddispatcher.cpp b/src/redis/asynchirediscommanddispatcher.cpp
new file mode 100644 (file)
index 0000000..e4195a9
--- /dev/null
@@ -0,0 +1,328 @@
+/*
+   Copyright (c) 2018-2019 Nokia.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+#include "private/redis/asynchirediscommanddispatcher.hpp"
+#include <algorithm>
+#include <cstring>
+#include <cerrno>
+#include <sstream>
+#include <arpa/inet.h>
+#include "private/abort.hpp"
+#include "private/createlogger.hpp"
+#include "private/engine.hpp"
+#include "private/error.hpp"
+#include "private/logger.hpp"
+#include "private/redis/asyncredisreply.hpp"
+#include "private/redis/reply.hpp"
+#include "private/redis/hiredissystem.hpp"
+#include "private/redis/hiredisepolladapter.hpp"
+#include "private/redis/contents.hpp"
+#include "private/redis/redisgeneral.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+    void connectCb(const redisAsyncContext* ac, int status)
+    {
+        bool isConnected = !status;
+        auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
+
+        if (isConnected)
+        {
+            std::ostringstream msg;
+            msg << "redis connected, fd: " << ac->c.fd;
+            logInfoOnce(msg.str());
+            instance->verifyConnection();
+        }
+        else
+            instance->setDisconnected();
+
+    }
+
+    void disconnectCb(const redisAsyncContext* ac, int status)
+    {
+        if (status) {
+            std::ostringstream msg;
+            msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
+            logErrorOnce(msg.str());
+        }
+        auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
+        instance->setDisconnected();
+    }
+
+    void cb(redisAsyncContext* ac, void* rr, void* pd)
+    {
+        auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
+        auto reply(static_cast<redisReply*>(rr));
+        auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
+        if (instance->isClientCallbacksEnabled())
+            instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
+    }
+}
+
+AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
+                                                             const std::string& address,
+                                                             uint16_t port,
+                                                             std::shared_ptr<ContentsBuilder> contentsBuilder,
+                                                             bool usePermanentCommandCallbacks,
+                                                             std::shared_ptr<Logger> logger):
+    AsyncHiredisCommandDispatcher(engine,
+                                  address,
+                                  port,
+                                  contentsBuilder,
+                                  usePermanentCommandCallbacks,
+                                  HiredisSystem::getHiredisSystem(),
+                                  std::make_shared<HiredisEpollAdapter>(engine),
+                                  logger)
+{
+}
+
+AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
+                                                             const std::string& address,
+                                                             uint16_t port,
+                                                             std::shared_ptr<ContentsBuilder> contentsBuilder,
+                                                             bool usePermanentCommandCallbacks,
+                                                             HiredisSystem& hiredisSystem,
+                                                             std::shared_ptr<HiredisEpollAdapter> adapter,
+                                                             std::shared_ptr<Logger> logger):
+    engine(engine),
+    address(address),
+    port(ntohs(port)),
+    contentsBuilder(contentsBuilder),
+    usePermanentCommandCallbacks(usePermanentCommandCallbacks),
+    hiredisSystem(hiredisSystem),
+    adapter(adapter),
+    ac(nullptr),
+    serviceState(ServiceState::DISCONNECTED),
+    clientCallbacksEnabled(true),
+    connectionRetryTimer(engine),
+    connectionRetryTimerDuration(std::chrono::seconds(1)),
+    connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
+    logger(logger)
+
+{
+    connect();
+}
+
+AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
+{
+    disconnectHiredis();
+}
+
+void AsyncHiredisCommandDispatcher::connect()
+{
+    ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
+    if (ac == nullptr || ac->err)
+    {
+        setDisconnected();
+        return;
+    }
+    ac->data = this;
+    adapter->attach(ac);
+    hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
+    hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
+}
+
+void AsyncHiredisCommandDispatcher::verifyConnection()
+{
+   /* When Redis has max amount of users, it will still accept new connections but will
+    * close them immediately. Therefore, we need to verify that just established connection
+    * really works. This prevents calling client readyAck callback for a connection that
+    * will be terminated immediately.
+    */
+    /* Connection verification is now done by doing redis command list query. Because we anyway
+     * need to verify that Redis has required commands, we can now combine these two operations
+     * (command list query and connection verification). If either one of the functionalities
+     * is not needed in the future and it is removed, remember to still leave the other one.
+     */
+    serviceState = ServiceState::CONNECTION_VERIFICATION;
+    /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
+     * we are spontaneously connected to redis while timer is running. If connection verification
+     * fails, timer is armed again (normal handling in connection verification).
+     */
+    connectionRetryTimer.disarm();
+    dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
+                            this,
+                            std::placeholders::_1,
+                            std::placeholders::_2),
+                  contentsBuilder->build("COMMAND"),
+                  false);
+}
+
+void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
+                                                          const redis::Reply& reply)
+{
+    if(error)
+    {
+        logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
+                        << error.message();
+
+        if (!connectionRetryTimer.isArmed())
+        {
+            /* Typically if connection verification fails, hiredis will call disconnect callback and
+             * whole connection establishment procedure will be restarted via that. To ensure that
+             * we will retry verification even if connection would not be disconnected this timer
+             * is set. If connection is later disconnected, this timer is disarmed (when disconnect
+             * callback handling arms this timer again).
+             */
+            armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
+                                    std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
+        }
+    }
+    else
+    {
+        if (checkRedisModuleCommands(parseCommandListReply(reply)))
+            setConnected();
+        else
+            SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
+    }
+}
+
+void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
+{
+    this->connectAck = connectAck;
+    if (serviceState == ServiceState::CONNECTED)
+        engine.postCallback(connectAck);
+}
+
+void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
+{
+    disconnectCallback = disconnectCb;
+}
+
+void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
+                                                  const AsyncConnection::Namespace&,
+                                                  const Contents& contents)
+{
+    dispatchAsync(commandCb, contents, true);
+}
+
+void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
+                                                  const Contents& contents,
+                                                  bool checkConnectionState)
+{
+    if (checkConnectionState && serviceState != ServiceState::CONNECTED)
+    {
+        engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
+                                       this,
+                                       commandCb,
+                                       std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
+        return;
+    }
+    cbs.push_back(commandCb);
+    std::vector<const char*> chars;
+    std::transform(contents.stack.begin(), contents.stack.end(),
+                   std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
+    if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
+                                            &chars[0], &contents.sizes[0]) != REDIS_OK)
+    {
+        removeCb(cbs.back());
+        engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
+                                       this,
+                                       commandCb,
+                                       getRedisError(ac->err, ac->errstr, nullptr)));
+    }
+}
+
+void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
+{
+    clientCallbacksEnabled = false;
+}
+
+void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
+                                                           const std::error_code& error)
+{
+    commandCb(error, AsyncRedisReply());
+}
+
+void AsyncHiredisCommandDispatcher::setConnected()
+{
+    serviceState = ServiceState::CONNECTED;
+
+    if (connectAck)
+    {
+        connectAck();
+        connectAck = ConnectAck();
+    }
+}
+
+void AsyncHiredisCommandDispatcher::setDisconnected()
+{
+    serviceState = ServiceState::DISCONNECTED;
+
+    if (disconnectCallback)
+        disconnectCallback();
+
+    armConnectionRetryTimer(connectionRetryTimerDuration,
+                            std::bind(&AsyncHiredisCommandDispatcher::connect, this));
+}
+
+void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
+                                                const std::error_code& error,
+                                                const redisReply* rr)
+{
+    if (!isValidCb(commandCb))
+        SHAREDDATALAYER_ABORT("Invalid callback function.");
+    if (error)
+        commandCb(error, AsyncRedisReply());
+    else
+        commandCb(error, AsyncRedisReply(*rr));
+    if (!usePermanentCommandCallbacks)
+        removeCb(commandCb);
+}
+
+bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
+{
+    return clientCallbacksEnabled;
+}
+
+bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
+{
+    for (auto i(cbs.begin()); i != cbs.end(); ++i)
+        if (&*i == &commandCb)
+            return true;
+    return false;
+}
+
+void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
+{
+    for (auto i(cbs.begin()); i != cbs.end(); ++i)
+        if (&*i == &commandCb)
+        {
+            cbs.erase(i);
+            break;
+        }
+}
+
+void AsyncHiredisCommandDispatcher::disconnectHiredis()
+{
+    /* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
+     * if ac is a valid pointer).
+     */
+    if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
+        hiredisSystem.redisAsyncFree(ac);
+
+    //disconnect callback handler will update serviceState
+}
+
+void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
+                                                            std::function<void()> retryAction)
+{
+    connectionRetryTimer.arm(duration,
+                             [retryAction] () { retryAction(); });
+}