ac4fc34282b4c48be1b3d72e1b7ae0abee08d7e6
[ric-plt/sdl.git] / src / redis / asynchiredisclustercommanddispatcher.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include "private/redis/asynchiredisclustercommanddispatcher.hpp"
23 #include <algorithm>
24 #include <cstring>
25 #include <cerrno>
26 #include <sstream>
27 #include "private/abort.hpp"
28 #include "private/createlogger.hpp"
29 #include "private/error.hpp"
30 #include "private/logger.hpp"
31 #include "private/redis/asyncredisreply.hpp"
32 #include "private/redis/reply.hpp"
33 #include "private/redis/hiredisclustersystem.hpp"
34 #include "private/engine.hpp"
35 #include "private/redis/hiredisclusterepolladapter.hpp"
36 #include "private/redis/contents.hpp"
37 #include "private/redis/redisgeneral.hpp"
38
39 using namespace shareddatalayer;
40 using namespace shareddatalayer::redis;
41
42 namespace
43 {
44     void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
45     {
46         if (!status)
47         {
48             std::ostringstream msg;
49             msg << "redis cluster instance connected, fd: " << ac->c.fd;
50             logDebugOnce(msg.str());
51         }
52     }
53
54     void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
55     {
56         if (status)
57         {
58             std::ostringstream msg;
59             msg << "redis cluster instance disconnected, fd: " << ac->c.fd
60                 << ", status: " << ac->err;
61             logDebugOnce(msg.str());
62         }
63         auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
64         instance->handleDisconnect(ac);
65     }
66
67     void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
68     {
69         auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
70         auto reply(static_cast<redisReply*>(rr));
71         auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
72         if (instance->isClientCallbacksEnabled())
73             instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
74     }
75 }
76
77 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
78                                                                            const boost::optional<std::string>& ns,
79                                                                            const DatabaseConfiguration::Addresses& addresses,
80                                                                            std::shared_ptr<ContentsBuilder> contentsBuilder,
81                                                                            bool usePermanentCommandCallbacks,
82                                                                            std::shared_ptr<Logger> logger):
83     AsyncHiredisClusterCommandDispatcher(engine,
84                                          ns,
85                                          addresses,
86                                          contentsBuilder,
87                                          usePermanentCommandCallbacks,
88                                          HiredisClusterSystem::getInstance(),
89                                          std::make_shared<HiredisClusterEpollAdapter>(engine),
90                                          logger)
91 {
92 }
93
94 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
95                                                                            const boost::optional<std::string>& ns,
96                                                                            const DatabaseConfiguration::Addresses& addresses,
97                                                                            std::shared_ptr<ContentsBuilder> contentsBuilder,
98                                                                            bool usePermanentCommandCallbacks,
99                                                                            HiredisClusterSystem& hiredisClusterSystem,
100                                                                            std::shared_ptr<HiredisClusterEpollAdapter> adapter,
101                                                                            std::shared_ptr<Logger> logger):
102     engine(engine),
103     initialNamespace(ns),
104     addresses(addresses),
105     contentsBuilder(contentsBuilder),
106     usePermanentCommandCallbacks(usePermanentCommandCallbacks),
107     hiredisClusterSystem(hiredisClusterSystem),
108     adapter(adapter),
109     acc(nullptr),
110     serviceState(ServiceState::DISCONNECTED),
111     clientCallbacksEnabled(true),
112     connectionRetryTimer(engine),
113     connectionRetryTimerDuration(std::chrono::seconds(1)),
114     logger(logger)
115 {
116     connect();
117 }
118
119 AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
120 {
121     disconnectHiredisCluster();
122 }
123
124 void AsyncHiredisClusterCommandDispatcher::connect()
125 {
126     // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
127     disconnectHiredisCluster();
128     acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
129                                                         HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
130     if (acc == nullptr)
131     {
132         logger->error() << "SDL: connecting to redis cluster failed, null context returned";
133         armConnectionRetryTimer();
134         return;
135     }
136     if (acc->err)
137     {
138         logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
139         armConnectionRetryTimer();
140         return;
141     }
142     acc->data = this;
143     adapter->setup(acc);
144     hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
145     hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
146     verifyConnection();
147 }
148
149 void AsyncHiredisClusterCommandDispatcher::verifyConnection()
150 {
151     /* redisClusterAsyncConnect only queries available cluster nodes but it does
152      * not connect to any cluster node (as it does not know to which node it should connect to).
153      * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
154      * determined to which cluster node this instance will connect to. We do initial operation
155      * to get connection to right redis node established already now. This also verifies that
156      * connection really works. When Redis has max amount of users, it will still accept new
157      * connections but is will close them immediately. Therefore, we need to verify that just
158      * established connection really works.
159      */
160     /* Connection setup/verification is now done by doing redis command list query. Because we anyway
161      * need to verify that Redis has required commands, we can now combine these two operations
162      * (command list query and connection setup/verification). If either one of the functionalities
163      * is not needed in the future and it is removed, remember to still leave the other one.
164      */
165     /* Non namespace-specific command list query can be used for connection setup purposes,
166      * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
167      * commands dispacthed.
168      */
169
170     /* If initial namespace was not given during dispatcher creation (multi namespace API),
171      * verification is sent to hardcoded namespace. This works for verification purposes
172      * because in our environment cluster is configured to operate only if all nodes
173      * are working (so we can do verification to any node). However, this is not optimal
174      * because we do not necessarily connect to such cluster node which will later be
175      * used by client. Also our cluster configuration can change. This needs to be
176      * optimized later (perhaps to connect to all nodes). */
177     std::string nsForVerification;
178     if (initialNamespace)
179         nsForVerification = *initialNamespace;
180     else
181         nsForVerification = "namespace";
182
183     dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
184                             this,
185                             std::placeholders::_1,
186                             std::placeholders::_2),
187                   nsForVerification,
188                   contentsBuilder->build("COMMAND"),
189                   false);
190 }
191
192 void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
193 {
194     if(error)
195     {
196         logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
197                         << error.message();
198         armConnectionRetryTimer();
199     }
200     else
201     {
202         if (checkRedisModuleCommands(parseCommandListReply(reply)))
203             setConnected();
204         else
205             SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
206     }
207 }
208
209 void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
210 {
211     this->connectAck = connectAck;
212     if (serviceState == ServiceState::CONNECTED)
213         engine.postCallback(connectAck);
214 }
215
216 void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
217 {
218     disconnectCallback = disconnectCb;
219 }
220
221 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
222                                                          const AsyncConnection::Namespace& ns,
223                                                          const Contents& contents)
224 {
225     dispatchAsync(commandCb, ns, contents, true);
226 }
227
228 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
229                                                          const AsyncConnection::Namespace& ns,
230                                                          const Contents& contents,
231                                                          bool checkConnectionState)
232 {
233     if (checkConnectionState && serviceState != ServiceState::CONNECTED)
234     {
235         engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
236                                        this,
237                                        commandCb,
238                                        std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
239         return;
240     }
241     cbs.push_back(commandCb);
242     std::vector<const char*> chars;
243     std::transform(contents.stack.begin(), contents.stack.end(),
244                    std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
245     if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
246                                                                  static_cast<int>(contents.stack.size()), &chars[0],
247                                                                  &contents.sizes[0]) != REDIS_OK)
248     {
249         removeCb(cbs.back());
250         engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
251                                        this,
252                                        commandCb,
253                                        getRedisError(acc->err, acc->errstr, nullptr)));
254     }
255 }
256
257 void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
258 {
259     clientCallbacksEnabled = false;
260 }
261
262 void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
263 {
264     commandCb(error, AsyncRedisReply());
265 }
266
267 void AsyncHiredisClusterCommandDispatcher::setConnected()
268 {
269     serviceState = ServiceState::CONNECTED;
270
271     if (connectAck)
272     {
273         connectAck();
274         connectAck = ConnectAck();
275     }
276 }
277
278 void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
279 {
280     connectionRetryTimer.arm(connectionRetryTimerDuration,
281                              [this] () { connect(); });
282
283 }
284
285 void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
286                                                        const std::error_code& error,
287                                                        const redisReply* rr)
288 {
289     if (!isValidCb(commandCb))
290         SHAREDDATALAYER_ABORT("Invalid callback function.");
291     if (error)
292         commandCb(error, AsyncRedisReply());
293     else
294         commandCb(error, AsyncRedisReply(*rr));
295     if (!usePermanentCommandCallbacks)
296         removeCb(commandCb);
297 }
298
299 bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
300 {
301     return clientCallbacksEnabled;
302 }
303
304 bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
305 {
306     for (auto i(cbs.begin()); i != cbs.end(); ++i)
307         if (&*i == &commandCb)
308             return true;
309     return false;
310 }
311
312 void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
313 {
314     for (auto i(cbs.begin()); i != cbs.end(); ++i)
315         if (&*i == &commandCb)
316         {
317             cbs.erase(i);
318             break;
319         }
320 }
321
322 void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
323 {
324     adapter->detach(ac);
325
326     if (disconnectCallback)
327         disconnectCallback();
328 }
329
330 void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
331 {
332     /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
333      * if acc is a valid pointer).
334      */
335     if (serviceState == ServiceState::CONNECTED)
336         hiredisClusterSystem.redisClusterAsyncFree(acc);
337
338     serviceState = ServiceState::DISCONNECTED;
339 }