2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
22 #include "private/redis/asynchiredisclustercommanddispatcher.hpp"
27 #include "private/abort.hpp"
28 #include "private/createlogger.hpp"
29 #include "private/error.hpp"
30 #include "private/logger.hpp"
31 #include "private/redis/asyncredisreply.hpp"
32 #include "private/redis/reply.hpp"
33 #include "private/redis/hiredisclustersystem.hpp"
34 #include "private/engine.hpp"
35 #include "private/redis/hiredisclusterepolladapter.hpp"
36 #include "private/redis/contents.hpp"
37 #include "private/redis/redisgeneral.hpp"
39 using namespace shareddatalayer;
40 using namespace shareddatalayer::redis;
44 void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
48 std::ostringstream msg;
49 msg << "redis cluster instance connected, fd: " << ac->c.fd;
50 logDebugOnce(msg.str());
54 void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
58 std::ostringstream msg;
59 msg << "redis cluster instance disconnected, fd: " << ac->c.fd
60 << ", status: " << ac->err;
61 logDebugOnce(msg.str());
63 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
64 instance->handleDisconnect(ac);
67 void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
69 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
70 auto reply(static_cast<redisReply*>(rr));
71 auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
72 if (instance->isClientCallbacksEnabled())
73 instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
77 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
78 const boost::optional<std::string>& ns,
79 const DatabaseConfiguration::Addresses& addresses,
80 std::shared_ptr<ContentsBuilder> contentsBuilder,
81 bool usePermanentCommandCallbacks,
82 std::shared_ptr<Logger> logger):
83 AsyncHiredisClusterCommandDispatcher(engine,
87 usePermanentCommandCallbacks,
88 HiredisClusterSystem::getInstance(),
89 std::make_shared<HiredisClusterEpollAdapter>(engine),
94 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
95 const boost::optional<std::string>& ns,
96 const DatabaseConfiguration::Addresses& addresses,
97 std::shared_ptr<ContentsBuilder> contentsBuilder,
98 bool usePermanentCommandCallbacks,
99 HiredisClusterSystem& hiredisClusterSystem,
100 std::shared_ptr<HiredisClusterEpollAdapter> adapter,
101 std::shared_ptr<Logger> logger):
103 initialNamespace(ns),
104 addresses(addresses),
105 contentsBuilder(contentsBuilder),
106 usePermanentCommandCallbacks(usePermanentCommandCallbacks),
107 hiredisClusterSystem(hiredisClusterSystem),
110 serviceState(ServiceState::DISCONNECTED),
111 clientCallbacksEnabled(true),
112 connectionRetryTimer(engine),
113 connectionRetryTimerDuration(std::chrono::seconds(1)),
119 AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
121 disconnectHiredisCluster();
124 void AsyncHiredisClusterCommandDispatcher::connect()
126 // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
127 disconnectHiredisCluster();
128 acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
129 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
132 logger->error() << "SDL: connecting to redis cluster failed, null context returned";
133 armConnectionRetryTimer();
138 logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
139 armConnectionRetryTimer();
144 hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
145 hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
149 void AsyncHiredisClusterCommandDispatcher::verifyConnection()
151 /* redisClusterAsyncConnect only queries available cluster nodes but it does
152 * not connect to any cluster node (as it does not know to which node it should connect to).
153 * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
154 * determined to which cluster node this instance will connect to. We do initial operation
155 * to get connection to right redis node established already now. This also verifies that
156 * connection really works. When Redis has max amount of users, it will still accept new
157 * connections but is will close them immediately. Therefore, we need to verify that just
158 * established connection really works.
160 /* Connection setup/verification is now done by doing redis command list query. Because we anyway
161 * need to verify that Redis has required commands, we can now combine these two operations
162 * (command list query and connection setup/verification). If either one of the functionalities
163 * is not needed in the future and it is removed, remember to still leave the other one.
165 /* Non namespace-specific command list query can be used for connection setup purposes,
166 * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
167 * commands dispacthed.
170 /* If initial namespace was not given during dispatcher creation (multi namespace API),
171 * verification is sent to hardcoded namespace. This works for verification purposes
172 * because in our environment cluster is configured to operate only if all nodes
173 * are working (so we can do verification to any node). However, this is not optimal
174 * because we do not necessarily connect to such cluster node which will later be
175 * used by client. Also our cluster configuration can change. This needs to be
176 * optimized later (perhaps to connect to all nodes). */
177 std::string nsForVerification;
178 if (initialNamespace)
179 nsForVerification = *initialNamespace;
181 nsForVerification = "namespace";
183 dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
185 std::placeholders::_1,
186 std::placeholders::_2),
188 contentsBuilder->build("COMMAND"),
192 void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
196 logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
198 armConnectionRetryTimer();
202 if (checkRedisModuleCommands(parseCommandListReply(reply)))
205 SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
209 void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
211 this->connectAck = connectAck;
212 if (serviceState == ServiceState::CONNECTED)
213 engine.postCallback(connectAck);
216 void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
218 disconnectCallback = disconnectCb;
221 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
222 const AsyncConnection::Namespace& ns,
223 const Contents& contents)
225 dispatchAsync(commandCb, ns, contents, true);
228 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
229 const AsyncConnection::Namespace& ns,
230 const Contents& contents,
231 bool checkConnectionState)
233 if (checkConnectionState && serviceState != ServiceState::CONNECTED)
235 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
238 std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
241 cbs.push_back(commandCb);
242 std::vector<const char*> chars;
243 std::transform(contents.stack.begin(), contents.stack.end(),
244 std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
245 if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
246 static_cast<int>(contents.stack.size()), &chars[0],
247 &contents.sizes[0]) != REDIS_OK)
249 removeCb(cbs.back());
250 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
253 getRedisError(acc->err, acc->errstr, nullptr)));
257 void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
259 clientCallbacksEnabled = false;
262 void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
264 commandCb(error, AsyncRedisReply());
267 void AsyncHiredisClusterCommandDispatcher::setConnected()
269 serviceState = ServiceState::CONNECTED;
274 connectAck = ConnectAck();
278 void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
280 connectionRetryTimer.arm(connectionRetryTimerDuration,
281 [this] () { connect(); });
285 void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
286 const std::error_code& error,
287 const redisReply* rr)
289 if (!isValidCb(commandCb))
290 SHAREDDATALAYER_ABORT("Invalid callback function.");
292 commandCb(error, AsyncRedisReply());
294 commandCb(error, AsyncRedisReply(*rr));
295 if (!usePermanentCommandCallbacks)
299 bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
301 return clientCallbacksEnabled;
304 bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
306 for (auto i(cbs.begin()); i != cbs.end(); ++i)
307 if (&*i == &commandCb)
312 void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
314 for (auto i(cbs.begin()); i != cbs.end(); ++i)
315 if (&*i == &commandCb)
322 void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
326 if (disconnectCallback)
327 disconnectCallback();
330 void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
332 /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
333 * if acc is a valid pointer).
335 if (serviceState == ServiceState::CONNECTED)
336 hiredisClusterSystem.redisClusterAsyncFree(acc);
338 serviceState = ServiceState::DISCONNECTED;