e4195a945cb2d3ac4d7d44bd515bdef32f6dcc76
[ric-plt/sdl.git] / src / redis / asynchirediscommanddispatcher.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include "private/redis/asynchirediscommanddispatcher.hpp"
18 #include <algorithm>
19 #include <cstring>
20 #include <cerrno>
21 #include <sstream>
22 #include <arpa/inet.h>
23 #include "private/abort.hpp"
24 #include "private/createlogger.hpp"
25 #include "private/engine.hpp"
26 #include "private/error.hpp"
27 #include "private/logger.hpp"
28 #include "private/redis/asyncredisreply.hpp"
29 #include "private/redis/reply.hpp"
30 #include "private/redis/hiredissystem.hpp"
31 #include "private/redis/hiredisepolladapter.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/redisgeneral.hpp"
34
35 using namespace shareddatalayer;
36 using namespace shareddatalayer::redis;
37
38 namespace
39 {
40     void connectCb(const redisAsyncContext* ac, int status)
41     {
42         bool isConnected = !status;
43         auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
44
45         if (isConnected)
46         {
47             std::ostringstream msg;
48             msg << "redis connected, fd: " << ac->c.fd;
49             logInfoOnce(msg.str());
50             instance->verifyConnection();
51         }
52         else
53             instance->setDisconnected();
54
55     }
56
57     void disconnectCb(const redisAsyncContext* ac, int status)
58     {
59         if (status) {
60             std::ostringstream msg;
61             msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
62             logErrorOnce(msg.str());
63         }
64         auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
65         instance->setDisconnected();
66     }
67
68     void cb(redisAsyncContext* ac, void* rr, void* pd)
69     {
70         auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
71         auto reply(static_cast<redisReply*>(rr));
72         auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
73         if (instance->isClientCallbacksEnabled())
74             instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
75     }
76 }
77
78 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
79                                                              const std::string& address,
80                                                              uint16_t port,
81                                                              std::shared_ptr<ContentsBuilder> contentsBuilder,
82                                                              bool usePermanentCommandCallbacks,
83                                                              std::shared_ptr<Logger> logger):
84     AsyncHiredisCommandDispatcher(engine,
85                                   address,
86                                   port,
87                                   contentsBuilder,
88                                   usePermanentCommandCallbacks,
89                                   HiredisSystem::getHiredisSystem(),
90                                   std::make_shared<HiredisEpollAdapter>(engine),
91                                   logger)
92 {
93 }
94
95 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
96                                                              const std::string& address,
97                                                              uint16_t port,
98                                                              std::shared_ptr<ContentsBuilder> contentsBuilder,
99                                                              bool usePermanentCommandCallbacks,
100                                                              HiredisSystem& hiredisSystem,
101                                                              std::shared_ptr<HiredisEpollAdapter> adapter,
102                                                              std::shared_ptr<Logger> logger):
103     engine(engine),
104     address(address),
105     port(ntohs(port)),
106     contentsBuilder(contentsBuilder),
107     usePermanentCommandCallbacks(usePermanentCommandCallbacks),
108     hiredisSystem(hiredisSystem),
109     adapter(adapter),
110     ac(nullptr),
111     serviceState(ServiceState::DISCONNECTED),
112     clientCallbacksEnabled(true),
113     connectionRetryTimer(engine),
114     connectionRetryTimerDuration(std::chrono::seconds(1)),
115     connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
116     logger(logger)
117
118 {
119     connect();
120 }
121
122 AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
123 {
124     disconnectHiredis();
125 }
126
127 void AsyncHiredisCommandDispatcher::connect()
128 {
129     ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
130     if (ac == nullptr || ac->err)
131     {
132         setDisconnected();
133         return;
134     }
135     ac->data = this;
136     adapter->attach(ac);
137     hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
138     hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
139 }
140
141 void AsyncHiredisCommandDispatcher::verifyConnection()
142 {
143    /* When Redis has max amount of users, it will still accept new connections but will
144     * close them immediately. Therefore, we need to verify that just established connection
145     * really works. This prevents calling client readyAck callback for a connection that
146     * will be terminated immediately.
147     */
148     /* Connection verification is now done by doing redis command list query. Because we anyway
149      * need to verify that Redis has required commands, we can now combine these two operations
150      * (command list query and connection verification). If either one of the functionalities
151      * is not needed in the future and it is removed, remember to still leave the other one.
152      */
153     serviceState = ServiceState::CONNECTION_VERIFICATION;
154     /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
155      * we are spontaneously connected to redis while timer is running. If connection verification
156      * fails, timer is armed again (normal handling in connection verification).
157      */
158     connectionRetryTimer.disarm();
159     dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
160                             this,
161                             std::placeholders::_1,
162                             std::placeholders::_2),
163                   contentsBuilder->build("COMMAND"),
164                   false);
165 }
166
167 void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
168                                                           const redis::Reply& reply)
169 {
170     if(error)
171     {
172         logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
173                         << error.message();
174
175         if (!connectionRetryTimer.isArmed())
176         {
177             /* Typically if connection verification fails, hiredis will call disconnect callback and
178              * whole connection establishment procedure will be restarted via that. To ensure that
179              * we will retry verification even if connection would not be disconnected this timer
180              * is set. If connection is later disconnected, this timer is disarmed (when disconnect
181              * callback handling arms this timer again).
182              */
183             armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
184                                     std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
185         }
186     }
187     else
188     {
189         if (checkRedisModuleCommands(parseCommandListReply(reply)))
190             setConnected();
191         else
192             SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
193     }
194 }
195
196 void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
197 {
198     this->connectAck = connectAck;
199     if (serviceState == ServiceState::CONNECTED)
200         engine.postCallback(connectAck);
201 }
202
203 void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
204 {
205     disconnectCallback = disconnectCb;
206 }
207
208 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
209                                                   const AsyncConnection::Namespace&,
210                                                   const Contents& contents)
211 {
212     dispatchAsync(commandCb, contents, true);
213 }
214
215 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
216                                                   const Contents& contents,
217                                                   bool checkConnectionState)
218 {
219     if (checkConnectionState && serviceState != ServiceState::CONNECTED)
220     {
221         engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
222                                        this,
223                                        commandCb,
224                                        std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
225         return;
226     }
227     cbs.push_back(commandCb);
228     std::vector<const char*> chars;
229     std::transform(contents.stack.begin(), contents.stack.end(),
230                    std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
231     if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
232                                             &chars[0], &contents.sizes[0]) != REDIS_OK)
233     {
234         removeCb(cbs.back());
235         engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
236                                        this,
237                                        commandCb,
238                                        getRedisError(ac->err, ac->errstr, nullptr)));
239     }
240 }
241
242 void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
243 {
244     clientCallbacksEnabled = false;
245 }
246
247 void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
248                                                            const std::error_code& error)
249 {
250     commandCb(error, AsyncRedisReply());
251 }
252
253 void AsyncHiredisCommandDispatcher::setConnected()
254 {
255     serviceState = ServiceState::CONNECTED;
256
257     if (connectAck)
258     {
259         connectAck();
260         connectAck = ConnectAck();
261     }
262 }
263
264 void AsyncHiredisCommandDispatcher::setDisconnected()
265 {
266     serviceState = ServiceState::DISCONNECTED;
267
268     if (disconnectCallback)
269         disconnectCallback();
270
271     armConnectionRetryTimer(connectionRetryTimerDuration,
272                             std::bind(&AsyncHiredisCommandDispatcher::connect, this));
273 }
274
275 void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
276                                                 const std::error_code& error,
277                                                 const redisReply* rr)
278 {
279     if (!isValidCb(commandCb))
280         SHAREDDATALAYER_ABORT("Invalid callback function.");
281     if (error)
282         commandCb(error, AsyncRedisReply());
283     else
284         commandCb(error, AsyncRedisReply(*rr));
285     if (!usePermanentCommandCallbacks)
286         removeCb(commandCb);
287 }
288
289 bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
290 {
291     return clientCallbacksEnabled;
292 }
293
294 bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
295 {
296     for (auto i(cbs.begin()); i != cbs.end(); ++i)
297         if (&*i == &commandCb)
298             return true;
299     return false;
300 }
301
302 void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
303 {
304     for (auto i(cbs.begin()); i != cbs.end(); ++i)
305         if (&*i == &commandCb)
306         {
307             cbs.erase(i);
308             break;
309         }
310 }
311
312 void AsyncHiredisCommandDispatcher::disconnectHiredis()
313 {
314     /* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
315      * if ac is a valid pointer).
316      */
317     if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
318         hiredisSystem.redisAsyncFree(ac);
319
320     //disconnect callback handler will update serviceState
321 }
322
323 void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
324                                                             std::function<void()> retryAction)
325 {
326     connectionRetryTimer.arm(duration,
327                              [retryAction] () { retryAction(); });
328 }