be51c5e1af75d0d122093dfa2ebd7d7c5320791f
[ric-plt/sdl.git] / src / redis / asyncsentineldatabasediscovery.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include <arpa/inet.h>
18 #include <boost/algorithm/string.hpp>
19 #include <iostream>
20 #include <string>
21 #include <vector>
22 #include <sdl/asyncstorage.hpp>
23 #include "private/abort.hpp"
24 #include "private/hostandport.hpp"
25 #include "private/redis/asyncsentineldatabasediscovery.hpp"
26 #include "private/redis/asynccommanddispatcher.hpp"
27 #include "private/redis/contents.hpp"
28 #include "private/redis/contentsbuilder.hpp"
29 #include "private/redis/reply.hpp"
30
31 using namespace shareddatalayer;
32 using namespace shareddatalayer::redis;
33
34 namespace
35 {
36     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
37                                                                           const DatabaseInfo& databaseInfo,
38                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
39                                                                           std::shared_ptr<Logger> logger,
40                                                                           bool usePermanentCommandCallbacks);
41
42     struct SubscribeReply
43     {
44         enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
45         Type type;
46         std::string message;
47
48         SubscribeReply(): type(Type::UNKNOWN) { }
49     };
50
51     std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
52
53     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
54
55     std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
56 }
57
58 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
59                                                                std::shared_ptr<Logger> logger):
60         AsyncSentinelDatabaseDiscovery(engine,
61                                        logger,
62                                        ::asyncCommandDispatcherCreator,
63                                        std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
64 {
65 }
66
67 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
68                                                                std::shared_ptr<Logger> logger,
69                                                                const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
70                                                                std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
71         engine(engine),
72         logger(logger),
73         // @TODO Make configurable.
74         databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}),
75                      DatabaseInfo::Type::SINGLE,
76                      boost::none,
77                      DatabaseInfo::Discovery::SENTINEL})),
78         contentsBuilder(contentsBuilder),
79         subscribeRetryTimer(*engine),
80         subscribeRetryTimerDuration(std::chrono::seconds(1)),
81         masterInquiryRetryTimer(*engine),
82         masterInquiryRetryTimerDuration(std::chrono::seconds(1))
83 {
84     subscriber = asyncCommandDispatcherCreator(*engine,
85                                                databaseInfo,
86                                                contentsBuilder,
87                                                logger,
88                                                true);
89     dispatcher = asyncCommandDispatcherCreator(*engine,
90                                                databaseInfo,
91                                                contentsBuilder,
92                                                logger,
93                                                false);
94 }
95
96 AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
97 {
98     if (subscriber)
99         subscriber->disableCommandCallbacks();
100     if (dispatcher)
101         dispatcher->disableCommandCallbacks();
102     stateChangedCb = nullptr;
103 }
104
105 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
106 {
107     stateChangedCb = cb;
108     subscriber->registerDisconnectCb([this]()
109             {
110                 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
111             });
112     subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
113 }
114
115 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
116 {
117     stateChangedCb = nullptr;
118 }
119
120 void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
121 {
122     subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
123                                         this,
124                                         std::placeholders::_1,
125                                         std::placeholders::_2),
126                               "dummyNamespace", // Not meaningful for Sentinel
127                               contentsBuilder->build("SUBSCRIBE", "+switch-master"));
128 }
129
130 void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
131                                                   const Reply& reply)
132 {
133     if (!error)
134     {
135         auto subscribeReply = parseSubscribeReply(reply, *logger);
136         if (subscribeReply)
137         {
138             switch (subscribeReply->type)
139             {
140                 case (SubscribeReply::Type::SUBSCRIBE_REPLY):
141                 {
142                     dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
143                     break;
144                 }
145                 case (SubscribeReply::Type::NOTIFICATION):
146                 {
147                     auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
148                     if (hostAndPort)
149                     {
150                         auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
151                                                        DatabaseInfo::Type::SINGLE,
152                                                        boost::none,
153                                                        DatabaseInfo::Discovery::SENTINEL}));
154                         if (stateChangedCb)
155                             stateChangedCb(databaseInfo);
156                     }
157                     else
158                         SHAREDDATALAYER_ABORT("Notification message parsing error.");
159                     break;
160                 }
161                 case (SubscribeReply::Type::UNKNOWN):
162                 {
163                     logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
164                     SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
165                 }
166             }
167         }
168         else
169             SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
170     }
171     else
172         subscribeRetryTimer.arm(
173                 subscribeRetryTimerDuration,
174                 std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
175 }
176
177 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
178 {
179     dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
180                                         this,
181                                         std::placeholders::_1,
182                                         std::placeholders::_2),
183                               "dummyNamespace", // Not meaningful for Sentinel
184                               contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
185 }
186
187 void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
188                                                       const Reply& reply)
189 {
190     if (!error)
191     {
192         auto hostAndPort = parseMasterInquiryReply(reply, *logger);
193         if (hostAndPort)
194         {
195             auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
196                                            DatabaseInfo::Type::SINGLE,
197                                            boost::none,
198                                            DatabaseInfo::Discovery::SENTINEL}));
199             if (stateChangedCb)
200                 stateChangedCb(databaseInfo);
201         }
202         else
203             SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
204     }
205     else
206     {
207         masterInquiryRetryTimer.arm(
208                 masterInquiryRetryTimerDuration,
209                 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
210     }
211 }
212
213 namespace
214 {
215     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
216                                                                           const DatabaseInfo& databaseInfo,
217                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
218                                                                           std::shared_ptr<Logger> logger,
219                                                                           bool usePermanentCommandCallbacks)
220     {
221         return AsyncCommandDispatcher::create(engine,
222                                               databaseInfo,
223                                               contentsBuilder,
224                                               usePermanentCommandCallbacks,
225                                               logger,
226                                               true);
227     }
228
229     std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
230     {
231         // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
232         auto replyType = reply.getType();
233         if (replyType == Reply::Type::ARRAY)
234         {
235             auto& replyVector(*reply.getArray());
236             auto firstElementType = replyVector[0]->getType();
237             if (firstElementType == Reply::Type::STRING)
238             {
239                 auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
240                 auto kind(replyVector[0]->getString()->str);
241                 if (kind == "subscribe")
242                 {
243                     subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
244                     return subscribeReply;
245                 }
246                 else if (kind == "message")
247                 {
248                     subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
249                     auto thirdElementType = replyVector[2]->getType();
250                     if (thirdElementType == Reply::Type::STRING)
251                     {
252                         subscribeReply->message = replyVector[2]->getString()->str;
253                         return subscribeReply;
254                     }
255                     else
256                         logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
257                 }
258                 else
259                     logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
260             }
261             else
262                 logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
263                                << static_cast<int>(firstElementType) << std::endl;
264         }
265         else
266             logger.debug() << "Invalid SUBSCRIBE reply type: "
267                            << static_cast<int>(replyType) << std::endl;
268         return nullptr;
269     }
270
271     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
272     {
273         auto replyType = reply.getType();
274         if (replyType == Reply::Type::ARRAY)
275         {
276             auto& replyVector(*reply.getArray());
277             auto hostElementType = replyVector[0]->getType();
278             if (hostElementType == Reply::Type::STRING)
279             {
280                 auto host(replyVector[0]->getString()->str);
281                 auto portElementType = replyVector[1]->getType();
282                 if (portElementType == Reply::Type::STRING)
283                 {
284                     auto port(replyVector[1]->getString()->str);
285                     try
286                     {
287                         return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
288                     }
289                     catch (const std::exception& e)
290                     {
291                         logger.debug() << "Invalid host or port in master inquiry reply, host: "
292                                        << host << ", port: " << port
293                                        << ", exception: " << e.what() << std::endl;
294                     }
295                 }
296                 else
297                     logger.debug() << "Invalid port element type in master inquiry reply: "
298                                    << static_cast<int>(portElementType) << std::endl;
299             }
300             else
301                 logger.debug() << "Invalid host element type in master inquiry reply: "
302                                << static_cast<int>(hostElementType) << std::endl;
303         }
304         else
305             logger.debug() << "Invalid master inquiry reply type: "
306                            << static_cast<int>(replyType) << std::endl;
307         return nullptr;
308     }
309
310     std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
311     {
312         std::vector<std::string> splittedMessage;
313         boost::split(splittedMessage, message, boost::is_any_of(" "));
314         if (splittedMessage.size() == 5)
315         {
316             auto host = splittedMessage[3];
317             auto port = splittedMessage[4];
318             try
319             {
320                 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
321             }
322             catch (const std::exception& e)
323             {
324                 logger.debug() << "Invalid host or port in notification message, host: "
325                                << host << ", port: " << port
326                                << ", exception: " << e.what() << std::endl;
327             }
328         }
329         else
330             logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;
331         return nullptr;
332     }
333 }