RIC:1060: Change in PTL
[ric-plt/sdl.git] / src / redis / asynchirediscommanddispatcher.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include "private/redis/asynchirediscommanddispatcher.hpp"
23 #include <algorithm>
24 #include <cstring>
25 #include <cerrno>
26 #include <sstream>
27 #include <arpa/inet.h>
28 #include "private/abort.hpp"
29 #include "private/createlogger.hpp"
30 #include "private/engine.hpp"
31 #include "private/error.hpp"
32 #include "private/logger.hpp"
33 #include "private/redis/asyncredisreply.hpp"
34 #include "private/redis/reply.hpp"
35 #include "private/redis/hiredissystem.hpp"
36 #include "private/redis/hiredisepolladapter.hpp"
37 #include "private/redis/contents.hpp"
38 #include "private/redis/redisgeneral.hpp"
39
40 using namespace shareddatalayer;
41 using namespace shareddatalayer::redis;
42
43 namespace
44 {
45     void connectCb(const redisAsyncContext* ac, int status)
46     {
47         bool isConnected = !status;
48         auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
49
50         if (isConnected)
51         {
52             std::ostringstream msg;
53             msg << "redis connected, fd: " << ac->c.fd;
54             logInfoOnce(msg.str());
55             instance->verifyConnection();
56         }
57         else
58             instance->setDisconnected();
59
60     }
61
62     void disconnectCb(const redisAsyncContext* ac, int status)
63     {
64         if (status) {
65             std::ostringstream msg;
66             msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
67             logErrorOnce(msg.str());
68         }
69         auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
70         instance->setDisconnected();
71     }
72
73     void cb(redisAsyncContext* ac, void* rr, void* pd)
74     {
75         auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
76         auto reply(static_cast<redisReply*>(rr));
77         auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
78         if (instance->isClientCallbacksEnabled())
79             instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
80     }
81 }
82
83 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
84                                                              const std::string& address,
85                                                              uint16_t port,
86                                                              std::shared_ptr<ContentsBuilder> contentsBuilder,
87                                                              bool usePermanentCommandCallbacks,
88                                                              std::shared_ptr<Logger> logger,
89                                                              bool usedForSentinel):
90     AsyncHiredisCommandDispatcher(engine,
91                                   address,
92                                   port,
93                                   contentsBuilder,
94                                   usePermanentCommandCallbacks,
95                                   HiredisSystem::getHiredisSystem(),
96                                   std::make_shared<HiredisEpollAdapter>(engine),
97                                   logger,
98                                   usedForSentinel)
99 {
100 }
101
102 AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
103                                                              const std::string& address,
104                                                              uint16_t port,
105                                                              std::shared_ptr<ContentsBuilder> contentsBuilder,
106                                                              bool usePermanentCommandCallbacks,
107                                                              HiredisSystem& hiredisSystem,
108                                                              std::shared_ptr<HiredisEpollAdapter> adapter,
109                                                              std::shared_ptr<Logger> logger,
110                                                              bool usedForSentinel):
111     engine(engine),
112     address(address),
113     port(ntohs(port)),
114     contentsBuilder(contentsBuilder),
115     usePermanentCommandCallbacks(usePermanentCommandCallbacks),
116     hiredisSystem(hiredisSystem),
117     adapter(adapter),
118     ac(nullptr),
119     serviceState(ServiceState::DISCONNECTED),
120     clientCallbacksEnabled(true),
121     connectionRetryTimer(engine),
122     connectionRetryTimerDuration(std::chrono::seconds(1)),
123     connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
124     logger(logger),
125     usedForSentinel(usedForSentinel)
126
127 {
128     connect();
129 }
130
131 AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
132 {
133     disconnectHiredis();
134 }
135
136 void AsyncHiredisCommandDispatcher::connect()
137 {
138     ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
139     if (ac == nullptr || ac->err)
140     {
141         setDisconnected();
142         return;
143     }
144     ac->data = this;
145     adapter->attach(ac);
146     hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
147     hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
148 }
149
150 void AsyncHiredisCommandDispatcher::verifyConnection()
151 {
152     if (usedForSentinel)
153         setConnected();
154     else
155     {
156        /* When Redis has max amount of users, it will still accept new connections but will
157         * close them immediately. Therefore, we need to verify that just established connection
158         * really works. This prevents calling client readyAck callback for a connection that
159         * will be terminated immediately.
160         */
161         /* Connection verification is now done by doing redis command list query. Because we anyway
162          * need to verify that Redis has required commands, we can now combine these two operations
163          * (command list query and connection verification). If either one of the functionalities
164          * is not needed in the future and it is removed, remember to still leave the other one.
165          */
166         serviceState = ServiceState::CONNECTION_VERIFICATION;
167         /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
168          * we are spontaneously connected to redis while timer is running. If connection verification
169          * fails, timer is armed again (normal handling in connection verification).
170          */
171         connectionRetryTimer.disarm();
172         dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
173                                 this,
174                                 std::placeholders::_1,
175                                 std::placeholders::_2),
176                       contentsBuilder->build("COMMAND"),
177                       false);
178     }
179 }
180
181 void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
182                                                           const redis::Reply& reply)
183 {
184     if(error)
185     {
186         logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
187                         << error.message();
188
189         if (!connectionRetryTimer.isArmed())
190         {
191             /* Typically if connection verification fails, hiredis will call disconnect callback and
192              * whole connection establishment procedure will be restarted via that. To ensure that
193              * we will retry verification even if connection would not be disconnected this timer
194              * is set. If connection is later disconnected, this timer is disarmed (when disconnect
195              * callback handling arms this timer again).
196              */
197             armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
198                                     std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
199         }
200     }
201     else
202     {
203         if (checkRedisModuleCommands(parseCommandListReply(reply)))
204             setConnected();
205         else
206             SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
207     }
208 }
209
210 void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
211 {
212     this->connectAck = connectAck;
213     if (serviceState == ServiceState::CONNECTED)
214         engine.postCallback(connectAck);
215 }
216
217 void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
218 {
219     disconnectCallback = disconnectCb;
220 }
221
222 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
223                                                   const AsyncConnection::Namespace&,
224                                                   const Contents& contents)
225 {
226     dispatchAsync(commandCb, contents, true);
227 }
228
229 void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
230                                                   const Contents& contents,
231                                                   bool checkConnectionState)
232 {
233     if (checkConnectionState && serviceState != ServiceState::CONNECTED)
234     {
235         engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
236                                        this,
237                                        commandCb,
238                                        std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
239         return;
240     }
241     cbs.push_back(commandCb);
242     std::vector<const char*> chars;
243     std::transform(contents.stack.begin(), contents.stack.end(),
244                    std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
245     if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
246                                             &chars[0], &contents.sizes[0]) != REDIS_OK)
247     {
248         removeCb(cbs.back());
249         engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
250                                        this,
251                                        commandCb,
252                                        getRedisError(ac->err, ac->errstr, nullptr)));
253     }
254 }
255
256 void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
257 {
258     clientCallbacksEnabled = false;
259 }
260
261 void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
262                                                            const std::error_code& error)
263 {
264     commandCb(error, AsyncRedisReply());
265 }
266
267 void AsyncHiredisCommandDispatcher::setConnected()
268 {
269     serviceState = ServiceState::CONNECTED;
270
271     if (connectAck)
272     {
273         connectAck();
274         connectAck = ConnectAck();
275     }
276 }
277
278 void AsyncHiredisCommandDispatcher::setDisconnected()
279 {
280     serviceState = ServiceState::DISCONNECTED;
281
282     if (disconnectCallback)
283         disconnectCallback();
284
285     armConnectionRetryTimer(connectionRetryTimerDuration,
286                             std::bind(&AsyncHiredisCommandDispatcher::connect, this));
287 }
288
289 void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
290                                                 const std::error_code& error,
291                                                 const redisReply* rr)
292 {
293     if (!isValidCb(commandCb))
294         SHAREDDATALAYER_ABORT("Invalid callback function.");
295     if (error)
296         commandCb(error, AsyncRedisReply());
297     else
298         commandCb(error, AsyncRedisReply(*rr));
299     if (!usePermanentCommandCallbacks)
300         removeCb(commandCb);
301 }
302
303 bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
304 {
305     return clientCallbacksEnabled;
306 }
307
308 bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
309 {
310     for (auto i(cbs.begin()); i != cbs.end(); ++i)
311         if (&*i == &commandCb)
312             return true;
313     return false;
314 }
315
316 void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
317 {
318     for (auto i(cbs.begin()); i != cbs.end(); ++i)
319         if (&*i == &commandCb)
320         {
321             cbs.erase(i);
322             break;
323         }
324 }
325
326 void AsyncHiredisCommandDispatcher::disconnectHiredis()
327 {
328     /* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
329      * if ac is a valid pointer).
330      */
331     if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
332         hiredisSystem.redisAsyncFree(ac);
333
334     //disconnect callback handler will update serviceState
335 }
336
337 void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
338                                                             std::function<void()> retryAction)
339 {
340     connectionRetryTimer.arm(duration,
341                              [retryAction] () { retryAction(); });
342 }