Add definable timeout for SyncStorage APIs
[ric-plt/sdl.git] / src / redis / asynchiredisclustercommanddispatcher.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include "private/redis/asynchiredisclustercommanddispatcher.hpp"
23 #include <algorithm>
24 #include <cstring>
25 #include <cerrno>
26 #include <sstream>
27 #include "private/abort.hpp"
28 #include "private/createlogger.hpp"
29 #include "private/error.hpp"
30 #include "private/logger.hpp"
31 #include "private/redis/asyncredisreply.hpp"
32 #include "private/redis/reply.hpp"
33 #include "private/redis/hiredisclustersystem.hpp"
34 #include "private/engine.hpp"
35 #include "private/redis/hiredisclusterepolladapter.hpp"
36 #include "private/redis/contents.hpp"
37 #include "private/redis/redisgeneral.hpp"
38
39 using namespace shareddatalayer;
40 using namespace shareddatalayer::redis;
41
42 namespace
43 {
44     void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
45     {
46         if (!status)
47         {
48             std::ostringstream msg;
49             msg << "redis cluster instance connected, fd: " << ac->c.fd;
50             logDebugOnce(msg.str());
51         }
52     }
53
54     void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
55     {
56         std::ostringstream msg;
57         msg << "redis cluster instance disconnected, status: " << ac->err
58             << ", " << ac->errstr << ", fd: " << ac->c.fd << std::endl;
59         logDebugOnce(msg.str());
60
61         auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
62         instance->handleDisconnect(ac);
63     }
64
65     void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
66     {
67         auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
68         auto reply(static_cast<redisReply*>(rr));
69         auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
70         if (instance->isClientCallbacksEnabled())
71             instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
72     }
73 }
74
75 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
76                                                                            const boost::optional<std::string>& ns,
77                                                                            const DatabaseConfiguration::Addresses& addresses,
78                                                                            std::shared_ptr<ContentsBuilder> contentsBuilder,
79                                                                            bool usePermanentCommandCallbacks,
80                                                                            std::shared_ptr<Logger> logger):
81     AsyncHiredisClusterCommandDispatcher(engine,
82                                          ns,
83                                          addresses,
84                                          contentsBuilder,
85                                          usePermanentCommandCallbacks,
86                                          HiredisClusterSystem::getInstance(),
87                                          std::make_shared<HiredisClusterEpollAdapter>(engine),
88                                          logger)
89 {
90 }
91
92 AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
93                                                                            const boost::optional<std::string>& ns,
94                                                                            const DatabaseConfiguration::Addresses& addresses,
95                                                                            std::shared_ptr<ContentsBuilder> contentsBuilder,
96                                                                            bool usePermanentCommandCallbacks,
97                                                                            HiredisClusterSystem& hiredisClusterSystem,
98                                                                            std::shared_ptr<HiredisClusterEpollAdapter> adapter,
99                                                                            std::shared_ptr<Logger> logger):
100     engine(engine),
101     initialNamespace(ns),
102     addresses(addresses),
103     contentsBuilder(contentsBuilder),
104     usePermanentCommandCallbacks(usePermanentCommandCallbacks),
105     hiredisClusterSystem(hiredisClusterSystem),
106     adapter(adapter),
107     acc(nullptr),
108     serviceState(ServiceState::DISCONNECTED),
109     clientCallbacksEnabled(true),
110     connectionRetryTimer(engine),
111     connectionRetryTimerDuration(std::chrono::seconds(1)),
112     logger(logger)
113 {
114     connect();
115 }
116
117 AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
118 {
119     disconnectHiredisCluster();
120 }
121
122 void AsyncHiredisClusterCommandDispatcher::connect()
123 {
124     // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
125     disconnectHiredisCluster();
126     acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
127                                                         HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
128     if (acc == nullptr)
129     {
130         logger->error() << "SDL: connecting to redis cluster failed, null context returned";
131         armConnectionRetryTimer();
132         return;
133     }
134     if (acc->err)
135     {
136         logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
137         armConnectionRetryTimer();
138         return;
139     }
140     acc->data = this;
141     adapter->setup(acc);
142     hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
143     hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
144     verifyConnection();
145 }
146
147 void AsyncHiredisClusterCommandDispatcher::verifyConnection()
148 {
149     /* redisClusterAsyncConnect only queries available cluster nodes but it does
150      * not connect to any cluster node (as it does not know to which node it should connect to).
151      * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
152      * determined to which cluster node this instance will connect to. We do initial operation
153      * to get connection to right redis node established already now. This also verifies that
154      * connection really works. When Redis has max amount of users, it will still accept new
155      * connections but is will close them immediately. Therefore, we need to verify that just
156      * established connection really works.
157      */
158     /* Connection setup/verification is now done by doing redis command list query. Because we anyway
159      * need to verify that Redis has required commands, we can now combine these two operations
160      * (command list query and connection setup/verification). If either one of the functionalities
161      * is not needed in the future and it is removed, remember to still leave the other one.
162      */
163     /* Non namespace-specific command list query can be used for connection setup purposes,
164      * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
165      * commands dispacthed.
166      */
167
168     /* If initial namespace was not given during dispatcher creation (multi namespace API),
169      * verification is sent to hardcoded namespace. This works for verification purposes
170      * because in our environment cluster is configured to operate only if all nodes
171      * are working (so we can do verification to any node). However, this is not optimal
172      * because we do not necessarily connect to such cluster node which will later be
173      * used by client. Also our cluster configuration can change. This needs to be
174      * optimized later (perhaps to connect to all nodes). */
175     std::string nsForVerification;
176     if (initialNamespace)
177         nsForVerification = *initialNamespace;
178     else
179         nsForVerification = "namespace";
180
181     dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
182                             this,
183                             std::placeholders::_1,
184                             std::placeholders::_2),
185                   nsForVerification,
186                   contentsBuilder->build("COMMAND"),
187                   false);
188 }
189
190 void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
191 {
192     if(error)
193     {
194         logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
195                         << error.message();
196         armConnectionRetryTimer();
197     }
198     else
199     {
200         if (checkRedisModuleCommands(parseCommandListReply(reply)))
201             setConnected();
202         else
203             SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
204     }
205 }
206
207 void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
208 {
209     this->connectAck = connectAck;
210     if (serviceState == ServiceState::CONNECTED)
211         engine.postCallback(connectAck);
212 }
213
214 void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
215 {
216     disconnectCallback = disconnectCb;
217 }
218
219 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
220                                                          const AsyncConnection::Namespace& ns,
221                                                          const Contents& contents)
222 {
223     dispatchAsync(commandCb, ns, contents, true);
224 }
225
226 void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
227                                                          const AsyncConnection::Namespace& ns,
228                                                          const Contents& contents,
229                                                          bool checkConnectionState)
230 {
231     if (checkConnectionState && serviceState != ServiceState::CONNECTED)
232     {
233         engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
234                                        this,
235                                        commandCb,
236                                        std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
237         return;
238     }
239     cbs.push_back(commandCb);
240     std::vector<const char*> chars;
241     std::transform(contents.stack.begin(), contents.stack.end(),
242                    std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
243     if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
244                                                                  static_cast<int>(contents.stack.size()), &chars[0],
245                                                                  &contents.sizes[0]) != REDIS_OK)
246     {
247         removeCb(cbs.back());
248         engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
249                                        this,
250                                        commandCb,
251                                        getRedisError(acc->err, acc->errstr, nullptr)));
252     }
253 }
254
255 void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
256 {
257     clientCallbacksEnabled = false;
258 }
259
260 void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
261 {
262     commandCb(error, AsyncRedisReply());
263 }
264
265 void AsyncHiredisClusterCommandDispatcher::setConnected()
266 {
267     serviceState = ServiceState::CONNECTED;
268
269     if (connectAck)
270     {
271         connectAck();
272         connectAck = ConnectAck();
273     }
274 }
275
276 void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
277 {
278     connectionRetryTimer.arm(connectionRetryTimerDuration,
279                              [this] () { connect(); });
280
281 }
282
283 void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
284                                                        const std::error_code& error,
285                                                        const redisReply* rr)
286 {
287     if (!isValidCb(commandCb))
288         SHAREDDATALAYER_ABORT("Invalid callback function.");
289     if (error)
290         commandCb(error, AsyncRedisReply());
291     else
292         commandCb(error, AsyncRedisReply(*rr));
293     if (!usePermanentCommandCallbacks)
294         removeCb(commandCb);
295 }
296
297 bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
298 {
299     return clientCallbacksEnabled;
300 }
301
302 bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
303 {
304     for (auto i(cbs.begin()); i != cbs.end(); ++i)
305         if (&*i == &commandCb)
306             return true;
307     return false;
308 }
309
310 void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
311 {
312     for (auto i(cbs.begin()); i != cbs.end(); ++i)
313         if (&*i == &commandCb)
314         {
315             cbs.erase(i);
316             break;
317         }
318 }
319
320 void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
321 {
322     adapter->detach(ac);
323
324     if (disconnectCallback)
325         disconnectCallback();
326 }
327
328 void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
329 {
330     /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
331      * if acc is a valid pointer).
332      */
333     if (serviceState == ServiceState::CONNECTED)
334         hiredisClusterSystem.redisClusterAsyncFree(acc);
335
336     serviceState = ServiceState::DISCONNECTED;
337 }