--- /dev/null
+/*
+ Copyright (c) 2018-2019 Nokia.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#include "private/redis/asynchirediscommanddispatcher.hpp"
+#include <algorithm>
+#include <cstring>
+#include <cerrno>
+#include <sstream>
+#include <arpa/inet.h>
+#include "private/abort.hpp"
+#include "private/createlogger.hpp"
+#include "private/engine.hpp"
+#include "private/error.hpp"
+#include "private/logger.hpp"
+#include "private/redis/asyncredisreply.hpp"
+#include "private/redis/reply.hpp"
+#include "private/redis/hiredissystem.hpp"
+#include "private/redis/hiredisepolladapter.hpp"
+#include "private/redis/contents.hpp"
+#include "private/redis/redisgeneral.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+ void connectCb(const redisAsyncContext* ac, int status)
+ {
+ bool isConnected = !status;
+ auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
+
+ if (isConnected)
+ {
+ std::ostringstream msg;
+ msg << "redis connected, fd: " << ac->c.fd;
+ logInfoOnce(msg.str());
+ instance->verifyConnection();
+ }
+ else
+ instance->setDisconnected();
+
+ }
+
+ void disconnectCb(const redisAsyncContext* ac, int status)
+ {
+ if (status) {
+ std::ostringstream msg;
+ msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
+ logErrorOnce(msg.str());
+ }
+ auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
+ instance->setDisconnected();
+ }
+
+ void cb(redisAsyncContext* ac, void* rr, void* pd)
+ {
+ auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
+ auto reply(static_cast<redisReply*>(rr));
+ auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
+ if (instance->isClientCallbacksEnabled())
+ instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
+ }
+}
+
+AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
+ const std::string& address,
+ uint16_t port,
+ std::shared_ptr<ContentsBuilder> contentsBuilder,
+ bool usePermanentCommandCallbacks,
+ std::shared_ptr<Logger> logger):
+ AsyncHiredisCommandDispatcher(engine,
+ address,
+ port,
+ contentsBuilder,
+ usePermanentCommandCallbacks,
+ HiredisSystem::getHiredisSystem(),
+ std::make_shared<HiredisEpollAdapter>(engine),
+ logger)
+{
+}
+
+AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
+ const std::string& address,
+ uint16_t port,
+ std::shared_ptr<ContentsBuilder> contentsBuilder,
+ bool usePermanentCommandCallbacks,
+ HiredisSystem& hiredisSystem,
+ std::shared_ptr<HiredisEpollAdapter> adapter,
+ std::shared_ptr<Logger> logger):
+ engine(engine),
+ address(address),
+ port(ntohs(port)),
+ contentsBuilder(contentsBuilder),
+ usePermanentCommandCallbacks(usePermanentCommandCallbacks),
+ hiredisSystem(hiredisSystem),
+ adapter(adapter),
+ ac(nullptr),
+ serviceState(ServiceState::DISCONNECTED),
+ clientCallbacksEnabled(true),
+ connectionRetryTimer(engine),
+ connectionRetryTimerDuration(std::chrono::seconds(1)),
+ connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
+ logger(logger)
+
+{
+ connect();
+}
+
+AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
+{
+ disconnectHiredis();
+}
+
+void AsyncHiredisCommandDispatcher::connect()
+{
+ ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
+ if (ac == nullptr || ac->err)
+ {
+ setDisconnected();
+ return;
+ }
+ ac->data = this;
+ adapter->attach(ac);
+ hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
+ hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
+}
+
+void AsyncHiredisCommandDispatcher::verifyConnection()
+{
+ /* When Redis has max amount of users, it will still accept new connections but will
+ * close them immediately. Therefore, we need to verify that just established connection
+ * really works. This prevents calling client readyAck callback for a connection that
+ * will be terminated immediately.
+ */
+ /* Connection verification is now done by doing redis command list query. Because we anyway
+ * need to verify that Redis has required commands, we can now combine these two operations
+ * (command list query and connection verification). If either one of the functionalities
+ * is not needed in the future and it is removed, remember to still leave the other one.
+ */
+ serviceState = ServiceState::CONNECTION_VERIFICATION;
+ /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
+ * we are spontaneously connected to redis while timer is running. If connection verification
+ * fails, timer is armed again (normal handling in connection verification).
+ */
+ connectionRetryTimer.disarm();
+ dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ contentsBuilder->build("COMMAND"),
+ false);
+}
+
+void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
+ const redis::Reply& reply)
+{
+ if(error)
+ {
+ logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
+ << error.message();
+
+ if (!connectionRetryTimer.isArmed())
+ {
+ /* Typically if connection verification fails, hiredis will call disconnect callback and
+ * whole connection establishment procedure will be restarted via that. To ensure that
+ * we will retry verification even if connection would not be disconnected this timer
+ * is set. If connection is later disconnected, this timer is disarmed (when disconnect
+ * callback handling arms this timer again).
+ */
+ armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
+ std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
+ }
+ }
+ else
+ {
+ if (checkRedisModuleCommands(parseCommandListReply(reply)))
+ setConnected();
+ else
+ SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
+ }
+}
+
+void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
+{
+ this->connectAck = connectAck;
+ if (serviceState == ServiceState::CONNECTED)
+ engine.postCallback(connectAck);
+}
+
+void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
+{
+ disconnectCallback = disconnectCb;
+}
+
+void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
+ const AsyncConnection::Namespace&,
+ const Contents& contents)
+{
+ dispatchAsync(commandCb, contents, true);
+}
+
+void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
+ const Contents& contents,
+ bool checkConnectionState)
+{
+ if (checkConnectionState && serviceState != ServiceState::CONNECTED)
+ {
+ engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
+ this,
+ commandCb,
+ std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
+ return;
+ }
+ cbs.push_back(commandCb);
+ std::vector<const char*> chars;
+ std::transform(contents.stack.begin(), contents.stack.end(),
+ std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
+ if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
+ &chars[0], &contents.sizes[0]) != REDIS_OK)
+ {
+ removeCb(cbs.back());
+ engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
+ this,
+ commandCb,
+ getRedisError(ac->err, ac->errstr, nullptr)));
+ }
+}
+
+void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
+{
+ clientCallbacksEnabled = false;
+}
+
+void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
+ const std::error_code& error)
+{
+ commandCb(error, AsyncRedisReply());
+}
+
+void AsyncHiredisCommandDispatcher::setConnected()
+{
+ serviceState = ServiceState::CONNECTED;
+
+ if (connectAck)
+ {
+ connectAck();
+ connectAck = ConnectAck();
+ }
+}
+
+void AsyncHiredisCommandDispatcher::setDisconnected()
+{
+ serviceState = ServiceState::DISCONNECTED;
+
+ if (disconnectCallback)
+ disconnectCallback();
+
+ armConnectionRetryTimer(connectionRetryTimerDuration,
+ std::bind(&AsyncHiredisCommandDispatcher::connect, this));
+}
+
+void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
+ const std::error_code& error,
+ const redisReply* rr)
+{
+ if (!isValidCb(commandCb))
+ SHAREDDATALAYER_ABORT("Invalid callback function.");
+ if (error)
+ commandCb(error, AsyncRedisReply());
+ else
+ commandCb(error, AsyncRedisReply(*rr));
+ if (!usePermanentCommandCallbacks)
+ removeCb(commandCb);
+}
+
+bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
+{
+ return clientCallbacksEnabled;
+}
+
+bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
+{
+ for (auto i(cbs.begin()); i != cbs.end(); ++i)
+ if (&*i == &commandCb)
+ return true;
+ return false;
+}
+
+void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
+{
+ for (auto i(cbs.begin()); i != cbs.end(); ++i)
+ if (&*i == &commandCb)
+ {
+ cbs.erase(i);
+ break;
+ }
+}
+
+void AsyncHiredisCommandDispatcher::disconnectHiredis()
+{
+ /* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
+ * if ac is a valid pointer).
+ */
+ if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
+ hiredisSystem.redisAsyncFree(ac);
+
+ //disconnect callback handler will update serviceState
+}
+
+void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
+ std::function<void()> retryAction)
+{
+ connectionRetryTimer.arm(duration,
+ [retryAction] () { retryAction(); });
+}