2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
22 #include "private/redis/asynchiredisclustercommanddispatcher.hpp"
27 #include "private/abort.hpp"
28 #include "private/createlogger.hpp"
29 #include "private/error.hpp"
30 #include "private/logger.hpp"
31 #include "private/redis/asyncredisreply.hpp"
32 #include "private/redis/reply.hpp"
33 #include "private/redis/hiredisclustersystem.hpp"
34 #include "private/engine.hpp"
35 #include "private/redis/hiredisclusterepolladapter.hpp"
36 #include "private/redis/contents.hpp"
37 #include "private/redis/redisgeneral.hpp"
39 using namespace shareddatalayer;
40 using namespace shareddatalayer::redis;
44 void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
48 std::ostringstream msg;
49 msg << "redis cluster instance connected, fd: " << ac->c.fd;
50 logDebugOnce(msg.str());
54 void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
56 std::ostringstream msg;
57 msg << "redis cluster instance disconnected, status: " << ac->err
58 << ", " << ac->errstr << ", fd: " << ac->c.fd << std::endl;
59 logDebugOnce(msg.str());
61 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
62 instance->handleDisconnect(ac);
65 void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
67 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
68 auto reply(static_cast<redisReply*>(rr));
69 auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
70 if (instance->isClientCallbacksEnabled())
71 instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
75 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
76 const boost::optional<std::string>& ns,
77 const DatabaseConfiguration::Addresses& addresses,
78 std::shared_ptr<ContentsBuilder> contentsBuilder,
79 bool usePermanentCommandCallbacks,
80 std::shared_ptr<Logger> logger):
81 AsyncHiredisClusterCommandDispatcher(engine,
85 usePermanentCommandCallbacks,
86 HiredisClusterSystem::getInstance(),
87 std::make_shared<HiredisClusterEpollAdapter>(engine),
92 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
93 const boost::optional<std::string>& ns,
94 const DatabaseConfiguration::Addresses& addresses,
95 std::shared_ptr<ContentsBuilder> contentsBuilder,
96 bool usePermanentCommandCallbacks,
97 HiredisClusterSystem& hiredisClusterSystem,
98 std::shared_ptr<HiredisClusterEpollAdapter> adapter,
99 std::shared_ptr<Logger> logger):
101 initialNamespace(ns),
102 addresses(addresses),
103 contentsBuilder(contentsBuilder),
104 usePermanentCommandCallbacks(usePermanentCommandCallbacks),
105 hiredisClusterSystem(hiredisClusterSystem),
108 serviceState(ServiceState::DISCONNECTED),
109 clientCallbacksEnabled(true),
110 connectionRetryTimer(engine),
111 connectionRetryTimerDuration(std::chrono::seconds(1)),
117 AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
119 disconnectHiredisCluster();
122 void AsyncHiredisClusterCommandDispatcher::connect()
124 // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
125 disconnectHiredisCluster();
126 acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
127 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
130 logger->error() << "SDL: connecting to redis cluster failed, null context returned";
131 armConnectionRetryTimer();
136 logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
137 armConnectionRetryTimer();
142 hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
143 hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
147 void AsyncHiredisClusterCommandDispatcher::verifyConnection()
149 /* redisClusterAsyncConnect only queries available cluster nodes but it does
150 * not connect to any cluster node (as it does not know to which node it should connect to).
151 * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
152 * determined to which cluster node this instance will connect to. We do initial operation
153 * to get connection to right redis node established already now. This also verifies that
154 * connection really works. When Redis has max amount of users, it will still accept new
155 * connections but is will close them immediately. Therefore, we need to verify that just
156 * established connection really works.
158 /* Connection setup/verification is now done by doing redis command list query. Because we anyway
159 * need to verify that Redis has required commands, we can now combine these two operations
160 * (command list query and connection setup/verification). If either one of the functionalities
161 * is not needed in the future and it is removed, remember to still leave the other one.
163 /* Non namespace-specific command list query can be used for connection setup purposes,
164 * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
165 * commands dispacthed.
168 /* If initial namespace was not given during dispatcher creation (multi namespace API),
169 * verification is sent to hardcoded namespace. This works for verification purposes
170 * because in our environment cluster is configured to operate only if all nodes
171 * are working (so we can do verification to any node). However, this is not optimal
172 * because we do not necessarily connect to such cluster node which will later be
173 * used by client. Also our cluster configuration can change. This needs to be
174 * optimized later (perhaps to connect to all nodes). */
175 std::string nsForVerification;
176 if (initialNamespace)
177 nsForVerification = *initialNamespace;
179 nsForVerification = "namespace";
181 dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
183 std::placeholders::_1,
184 std::placeholders::_2),
186 contentsBuilder->build("COMMAND"),
190 void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
194 logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
196 armConnectionRetryTimer();
200 if (checkRedisModuleCommands(parseCommandListReply(reply)))
203 SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
207 void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
209 this->connectAck = connectAck;
210 if (serviceState == ServiceState::CONNECTED)
211 engine.postCallback(connectAck);
214 void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
216 disconnectCallback = disconnectCb;
219 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
220 const AsyncConnection::Namespace& ns,
221 const Contents& contents)
223 dispatchAsync(commandCb, ns, contents, true);
226 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
227 const AsyncConnection::Namespace& ns,
228 const Contents& contents,
229 bool checkConnectionState)
231 if (checkConnectionState && serviceState != ServiceState::CONNECTED)
233 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
236 std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
239 cbs.push_back(commandCb);
240 std::vector<const char*> chars;
241 std::transform(contents.stack.begin(), contents.stack.end(),
242 std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
243 if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
244 static_cast<int>(contents.stack.size()), &chars[0],
245 &contents.sizes[0]) != REDIS_OK)
247 removeCb(cbs.back());
248 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
251 getRedisError(acc->err, acc->errstr, nullptr)));
255 void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
257 clientCallbacksEnabled = false;
260 void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
262 commandCb(error, AsyncRedisReply());
265 void AsyncHiredisClusterCommandDispatcher::setConnected()
267 serviceState = ServiceState::CONNECTED;
272 connectAck = ConnectAck();
276 void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
278 connectionRetryTimer.arm(connectionRetryTimerDuration,
279 [this] () { connect(); });
283 void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
284 const std::error_code& error,
285 const redisReply* rr)
287 if (!isValidCb(commandCb))
288 SHAREDDATALAYER_ABORT("Invalid callback function.");
290 commandCb(error, AsyncRedisReply());
292 commandCb(error, AsyncRedisReply(*rr));
293 if (!usePermanentCommandCallbacks)
297 bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
299 return clientCallbacksEnabled;
302 bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
304 for (auto i(cbs.begin()); i != cbs.end(); ++i)
305 if (&*i == &commandCb)
310 void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
312 for (auto i(cbs.begin()); i != cbs.end(); ++i)
313 if (&*i == &commandCb)
320 void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
324 if (disconnectCallback)
325 disconnectCallback();
328 void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
330 /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
331 * if acc is a valid pointer).
333 if (serviceState == ServiceState::CONNECTED)
334 hiredisClusterSystem.redisClusterAsyncFree(acc);
336 serviceState = ServiceState::DISCONNECTED;