2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
22 #include "private/redis/asynchirediscommanddispatcher.hpp"
27 #include <arpa/inet.h>
28 #include "private/abort.hpp"
29 #include "private/createlogger.hpp"
30 #include "private/engine.hpp"
31 #include "private/error.hpp"
32 #include "private/logger.hpp"
33 #include "private/redis/asyncredisreply.hpp"
34 #include "private/redis/reply.hpp"
35 #include "private/redis/hiredissystem.hpp"
36 #include "private/redis/hiredisepolladapter.hpp"
37 #include "private/redis/contents.hpp"
38 #include "private/redis/redisgeneral.hpp"
40 using namespace shareddatalayer;
41 using namespace shareddatalayer::redis;
45 void connectCb(const redisAsyncContext* ac, int status)
47 bool isConnected = !status;
48 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
52 std::ostringstream msg;
53 msg << "redis connected, fd: " << ac->c.fd;
54 logInfoOnce(msg.str());
55 instance->verifyConnection();
58 instance->setDisconnected();
62 void disconnectCb(const redisAsyncContext* ac, int status)
65 std::ostringstream msg;
66 msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
67 logErrorOnce(msg.str());
69 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
70 instance->setDisconnected();
73 void cb(redisAsyncContext* ac, void* rr, void* pd)
75 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
76 auto reply(static_cast<redisReply*>(rr));
77 auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
78 if (instance->isClientCallbacksEnabled())
79 instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
83 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
84 const std::string& address,
86 std::shared_ptr<ContentsBuilder> contentsBuilder,
87 bool usePermanentCommandCallbacks,
88 std::shared_ptr<Logger> logger,
89 bool usedForSentinel):
90 AsyncHiredisCommandDispatcher(engine,
94 usePermanentCommandCallbacks,
95 HiredisSystem::getHiredisSystem(),
96 std::make_shared<HiredisEpollAdapter>(engine),
102 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
103 const std::string& address,
105 std::shared_ptr<ContentsBuilder> contentsBuilder,
106 bool usePermanentCommandCallbacks,
107 HiredisSystem& hiredisSystem,
108 std::shared_ptr<HiredisEpollAdapter> adapter,
109 std::shared_ptr<Logger> logger,
110 bool usedForSentinel):
114 contentsBuilder(contentsBuilder),
115 usePermanentCommandCallbacks(usePermanentCommandCallbacks),
116 hiredisSystem(hiredisSystem),
119 serviceState(ServiceState::DISCONNECTED),
120 clientCallbacksEnabled(true),
121 connectionRetryTimer(engine),
122 connectionRetryTimerDuration(std::chrono::seconds(1)),
123 connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
125 usedForSentinel(usedForSentinel)
131 AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
136 void AsyncHiredisCommandDispatcher::connect()
138 ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
139 if (ac == nullptr || ac->err)
146 hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
147 hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
150 void AsyncHiredisCommandDispatcher::verifyConnection()
156 /* When Redis has max amount of users, it will still accept new connections but will
157 * close them immediately. Therefore, we need to verify that just established connection
158 * really works. This prevents calling client readyAck callback for a connection that
159 * will be terminated immediately.
161 /* Connection verification is now done by doing redis command list query. Because we anyway
162 * need to verify that Redis has required commands, we can now combine these two operations
163 * (command list query and connection verification). If either one of the functionalities
164 * is not needed in the future and it is removed, remember to still leave the other one.
166 serviceState = ServiceState::CONNECTION_VERIFICATION;
167 /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
168 * we are spontaneously connected to redis while timer is running. If connection verification
169 * fails, timer is armed again (normal handling in connection verification).
171 connectionRetryTimer.disarm();
172 dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
174 std::placeholders::_1,
175 std::placeholders::_2),
176 contentsBuilder->build("COMMAND"),
181 void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
182 const redis::Reply& reply)
186 logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
189 if (!connectionRetryTimer.isArmed())
191 /* Typically if connection verification fails, hiredis will call disconnect callback and
192 * whole connection establishment procedure will be restarted via that. To ensure that
193 * we will retry verification even if connection would not be disconnected this timer
194 * is set. If connection is later disconnected, this timer is disarmed (when disconnect
195 * callback handling arms this timer again).
197 armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
198 std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
203 if (checkRedisModuleCommands(parseCommandListReply(reply)))
206 SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
210 void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
212 this->connectAck = connectAck;
213 if (serviceState == ServiceState::CONNECTED)
214 engine.postCallback(connectAck);
217 void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
219 disconnectCallback = disconnectCb;
222 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
223 const AsyncConnection::Namespace&,
224 const Contents& contents)
226 dispatchAsync(commandCb, contents, true);
229 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
230 const Contents& contents,
231 bool checkConnectionState)
233 if (checkConnectionState && serviceState != ServiceState::CONNECTED)
235 engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
238 std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
241 cbs.push_back(commandCb);
242 std::vector<const char*> chars;
243 std::transform(contents.stack.begin(), contents.stack.end(),
244 std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
245 if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
246 &chars[0], &contents.sizes[0]) != REDIS_OK)
248 removeCb(cbs.back());
249 engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
252 getRedisError(ac->err, ac->errstr, nullptr)));
256 void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
258 clientCallbacksEnabled = false;
261 void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
262 const std::error_code& error)
264 commandCb(error, AsyncRedisReply());
267 void AsyncHiredisCommandDispatcher::setConnected()
269 serviceState = ServiceState::CONNECTED;
274 connectAck = ConnectAck();
278 void AsyncHiredisCommandDispatcher::setDisconnected()
280 serviceState = ServiceState::DISCONNECTED;
282 if (disconnectCallback)
283 disconnectCallback();
285 armConnectionRetryTimer(connectionRetryTimerDuration,
286 std::bind(&AsyncHiredisCommandDispatcher::connect, this));
289 void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
290 const std::error_code& error,
291 const redisReply* rr)
293 if (!isValidCb(commandCb))
294 SHAREDDATALAYER_ABORT("Invalid callback function.");
296 commandCb(error, AsyncRedisReply());
298 commandCb(error, AsyncRedisReply(*rr));
299 if (!usePermanentCommandCallbacks)
303 bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
305 return clientCallbacksEnabled;
308 bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
310 for (auto i(cbs.begin()); i != cbs.end(); ++i)
311 if (&*i == &commandCb)
316 void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
318 for (auto i(cbs.begin()); i != cbs.end(); ++i)
319 if (&*i == &commandCb)
326 void AsyncHiredisCommandDispatcher::disconnectHiredis()
328 /* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
329 * if ac is a valid pointer).
331 if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
332 hiredisSystem.redisAsyncFree(ac);
334 //disconnect callback handler will update serviceState
337 void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
338 std::function<void()> retryAction)
340 connectionRetryTimer.arm(duration,
341 [retryAction] () { retryAction(); });