72708866e3eb95cae22c6e2b97ae58b5ee88ede2
[ric-plt/sdl.git] / src / redis / asynchiredisclustercommanddispatcher.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include "private/redis/asynchiredisclustercommanddispatcher.hpp"
18 #include <algorithm>
19 #include <cstring>
20 #include <cerrno>
21 #include <sstream>
22 #include "private/abort.hpp"
23 #include "private/createlogger.hpp"
24 #include "private/error.hpp"
25 #include "private/logger.hpp"
26 #include "private/redis/asyncredisreply.hpp"
27 #include "private/redis/reply.hpp"
28 #include "private/redis/hiredisclustersystem.hpp"
29 #include "private/engine.hpp"
30 #include "private/redis/hiredisclusterepolladapter.hpp"
31 #include "private/redis/contents.hpp"
32 #include "private/redis/redisgeneral.hpp"
33
34 using namespace shareddatalayer;
35 using namespace shareddatalayer::redis;
36
37 namespace
38 {
39     void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
40     {
41         if (!status)
42         {
43             std::ostringstream msg;
44             msg << "redis cluster instance connected, fd: " << ac->c.fd;
45             logDebugOnce(msg.str());
46         }
47     }
48
49     void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
50     {
51         if (status)
52         {
53             std::ostringstream msg;
54             msg << "redis cluster instance disconnected, fd: " << ac->c.fd
55                 << ", status: " << ac->err;
56             logDebugOnce(msg.str());
57         }
58         auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
59         instance->handleDisconnect(ac);
60     }
61
62     void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
63     {
64         auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
65         auto reply(static_cast<redisReply*>(rr));
66         auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
67         if (instance->isClientCallbacksEnabled())
68             instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
69     }
70 }
71
72 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
73                                                                            const boost::optional<std::string>& ns,
74                                                                            const DatabaseConfiguration::Addresses& addresses,
75                                                                            std::shared_ptr<ContentsBuilder> contentsBuilder,
76                                                                            bool usePermanentCommandCallbacks,
77                                                                            std::shared_ptr<Logger> logger):
78     AsyncHiredisClusterCommandDispatcher(engine,
79                                          ns,
80                                          addresses,
81                                          contentsBuilder,
82                                          usePermanentCommandCallbacks,
83                                          HiredisClusterSystem::getInstance(),
84                                          std::make_shared<HiredisClusterEpollAdapter>(engine),
85                                          logger)
86 {
87 }
88
89 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
90                                                                            const boost::optional<std::string>& ns,
91                                                                            const DatabaseConfiguration::Addresses& addresses,
92                                                                            std::shared_ptr<ContentsBuilder> contentsBuilder,
93                                                                            bool usePermanentCommandCallbacks,
94                                                                            HiredisClusterSystem& hiredisClusterSystem,
95                                                                            std::shared_ptr<HiredisClusterEpollAdapter> adapter,
96                                                                            std::shared_ptr<Logger> logger):
97     engine(engine),
98     initialNamespace(ns),
99     addresses(addresses),
100     contentsBuilder(contentsBuilder),
101     usePermanentCommandCallbacks(usePermanentCommandCallbacks),
102     hiredisClusterSystem(hiredisClusterSystem),
103     adapter(adapter),
104     acc(nullptr),
105     serviceState(ServiceState::DISCONNECTED),
106     clientCallbacksEnabled(true),
107     connectionRetryTimer(engine),
108     connectionRetryTimerDuration(std::chrono::seconds(1)),
109     logger(logger)
110 {
111     connect();
112 }
113
114 AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
115 {
116     disconnectHiredisCluster();
117 }
118
119 void AsyncHiredisClusterCommandDispatcher::connect()
120 {
121     // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
122     disconnectHiredisCluster();
123     acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
124                                                         HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
125     if (acc == nullptr)
126     {
127         logger->error() << "SDL: connecting to redis cluster failed, null context returned";
128         armConnectionRetryTimer();
129         return;
130     }
131     if (acc->err)
132     {
133         logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
134         armConnectionRetryTimer();
135         return;
136     }
137     acc->data = this;
138     adapter->setup(acc);
139     hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
140     hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
141     verifyConnection();
142 }
143
144 void AsyncHiredisClusterCommandDispatcher::verifyConnection()
145 {
146     /* redisClusterAsyncConnect only queries available cluster nodes but it does
147      * not connect to any cluster node (as it does not know to which node it should connect to).
148      * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
149      * determined to which cluster node this instance will connect to. We do initial operation
150      * to get connection to right redis node established already now. This also verifies that
151      * connection really works. When Redis has max amount of users, it will still accept new
152      * connections but is will close them immediately. Therefore, we need to verify that just
153      * established connection really works.
154      */
155     /* Connection setup/verification is now done by doing redis command list query. Because we anyway
156      * need to verify that Redis has required commands, we can now combine these two operations
157      * (command list query and connection setup/verification). If either one of the functionalities
158      * is not needed in the future and it is removed, remember to still leave the other one.
159      */
160     /* Non namespace-specific command list query can be used for connection setup purposes,
161      * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
162      * commands dispacthed.
163      */
164
165     /* If initial namespace was not given during dispatcher creation (multi namespace API),
166      * verification is sent to hardcoded namespace. This works for verification purposes
167      * because in our environment cluster is configured to operate only if all nodes
168      * are working (so we can do verification to any node). However, this is not optimal
169      * because we do not necessarily connect to such cluster node which will later be
170      * used by client. Also our cluster configuration can change. This needs to be
171      * optimized later (perhaps to connect to all nodes). */
172     std::string nsForVerification;
173     if (initialNamespace)
174         nsForVerification = *initialNamespace;
175     else
176         nsForVerification = "namespace";
177
178     dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
179                             this,
180                             std::placeholders::_1,
181                             std::placeholders::_2),
182                   nsForVerification,
183                   contentsBuilder->build("COMMAND"),
184                   false);
185 }
186
187 void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
188 {
189     if(error)
190     {
191         logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
192                         << error.message();
193         armConnectionRetryTimer();
194     }
195     else
196     {
197         if (checkRedisModuleCommands(parseCommandListReply(reply)))
198             setConnected();
199         else
200             SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
201     }
202 }
203
204 void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
205 {
206     this->connectAck = connectAck;
207     if (serviceState == ServiceState::CONNECTED)
208         engine.postCallback(connectAck);
209 }
210
211 void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
212 {
213     disconnectCallback = disconnectCb;
214 }
215
216 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
217                                                          const AsyncConnection::Namespace& ns,
218                                                          const Contents& contents)
219 {
220     dispatchAsync(commandCb, ns, contents, true);
221 }
222
223 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
224                                                          const AsyncConnection::Namespace& ns,
225                                                          const Contents& contents,
226                                                          bool checkConnectionState)
227 {
228     if (checkConnectionState && serviceState != ServiceState::CONNECTED)
229     {
230         engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
231                                        this,
232                                        commandCb,
233                                        std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
234         return;
235     }
236     cbs.push_back(commandCb);
237     std::vector<const char*> chars;
238     std::transform(contents.stack.begin(), contents.stack.end(),
239                    std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
240     if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
241                                                                  static_cast<int>(contents.stack.size()), &chars[0],
242                                                                  &contents.sizes[0]) != REDIS_OK)
243     {
244         removeCb(cbs.back());
245         engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
246                                        this,
247                                        commandCb,
248                                        getRedisError(acc->err, acc->errstr, nullptr)));
249     }
250 }
251
252 void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
253 {
254     clientCallbacksEnabled = false;
255 }
256
257 void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
258 {
259     commandCb(error, AsyncRedisReply());
260 }
261
262 void AsyncHiredisClusterCommandDispatcher::setConnected()
263 {
264     serviceState = ServiceState::CONNECTED;
265
266     if (connectAck)
267     {
268         connectAck();
269         connectAck = ConnectAck();
270     }
271 }
272
273 void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
274 {
275     connectionRetryTimer.arm(connectionRetryTimerDuration,
276                              [this] () { connect(); });
277
278 }
279
280 void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
281                                                        const std::error_code& error,
282                                                        const redisReply* rr)
283 {
284     if (!isValidCb(commandCb))
285         SHAREDDATALAYER_ABORT("Invalid callback function.");
286     if (error)
287         commandCb(error, AsyncRedisReply());
288     else
289         commandCb(error, AsyncRedisReply(*rr));
290     if (!usePermanentCommandCallbacks)
291         removeCb(commandCb);
292 }
293
294 bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
295 {
296     return clientCallbacksEnabled;
297 }
298
299 bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
300 {
301     for (auto i(cbs.begin()); i != cbs.end(); ++i)
302         if (&*i == &commandCb)
303             return true;
304     return false;
305 }
306
307 void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
308 {
309     for (auto i(cbs.begin()); i != cbs.end(); ++i)
310         if (&*i == &commandCb)
311         {
312             cbs.erase(i);
313             break;
314         }
315 }
316
317 void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
318 {
319     adapter->detach(ac);
320
321     if (disconnectCallback)
322         disconnectCallback();
323 }
324
325 void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
326 {
327     /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
328      * if acc is a valid pointer).
329      */
330     if (serviceState == ServiceState::CONNECTED)
331         hiredisClusterSystem.redisClusterAsyncFree(acc);
332
333     serviceState = ServiceState::DISCONNECTED;
334 }