Add Sentinel configuration reading
[ric-plt/sdl.git] / src / redis / asyncsentineldatabasediscovery.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include <arpa/inet.h>
18 #include <boost/algorithm/string.hpp>
19 #include <iostream>
20 #include <string>
21 #include <vector>
22 #include <sdl/asyncstorage.hpp>
23 #include "private/abort.hpp"
24 #include "private/hostandport.hpp"
25 #include "private/redis/asyncsentineldatabasediscovery.hpp"
26 #include "private/redis/asynccommanddispatcher.hpp"
27 #include "private/redis/contents.hpp"
28 #include "private/redis/contentsbuilder.hpp"
29 #include "private/redis/reply.hpp"
30
31 using namespace shareddatalayer;
32 using namespace shareddatalayer::redis;
33
34 namespace
35 {
36     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
37                                                                           const DatabaseInfo& databaseInfo,
38                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
39                                                                           std::shared_ptr<Logger> logger,
40                                                                           bool usePermanentCommandCallbacks);
41
42     struct SubscribeReply
43     {
44         enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
45         Type type;
46         std::string message;
47
48         SubscribeReply(): type(Type::UNKNOWN) { }
49     };
50
51     std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
52
53     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
54
55     std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
56 }
57
58 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
59                                                                std::shared_ptr<Logger> logger,
60                                                                const HostAndPort& sentinelAddress,
61                                                                const std::string& sentinelMasterName):
62         AsyncSentinelDatabaseDiscovery(engine,
63                                        logger,
64                                        sentinelAddress,
65                                        sentinelMasterName,
66                                        ::asyncCommandDispatcherCreator,
67                                        std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
68 {
69 }
70
71 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
72                                                                std::shared_ptr<Logger> logger,
73                                                                const HostAndPort& sentinelAddress,
74                                                                const std::string& sentinelMasterName,
75                                                                const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
76                                                                std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
77         engine(engine),
78         logger(logger),
79         databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({sentinelAddress}),
80                      DatabaseInfo::Type::SINGLE,
81                      boost::none,
82                      DatabaseInfo::Discovery::SENTINEL})),
83         sentinelMasterName(sentinelMasterName),
84         contentsBuilder(contentsBuilder),
85         subscribeRetryTimer(*engine),
86         subscribeRetryTimerDuration(std::chrono::seconds(1)),
87         masterInquiryRetryTimer(*engine),
88         masterInquiryRetryTimerDuration(std::chrono::seconds(1))
89 {
90     subscriber = asyncCommandDispatcherCreator(*engine,
91                                                databaseInfo,
92                                                contentsBuilder,
93                                                logger,
94                                                true);
95     dispatcher = asyncCommandDispatcherCreator(*engine,
96                                                databaseInfo,
97                                                contentsBuilder,
98                                                logger,
99                                                false);
100 }
101
102 AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
103 {
104     if (subscriber)
105         subscriber->disableCommandCallbacks();
106     if (dispatcher)
107         dispatcher->disableCommandCallbacks();
108     stateChangedCb = nullptr;
109 }
110
111 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
112 {
113     stateChangedCb = cb;
114     subscriber->registerDisconnectCb([this]()
115             {
116                 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
117             });
118     subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
119 }
120
121 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
122 {
123     stateChangedCb = nullptr;
124 }
125
126 void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
127 {
128     subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
129                                         this,
130                                         std::placeholders::_1,
131                                         std::placeholders::_2),
132                               "dummyNamespace", // Not meaningful for Sentinel
133                               contentsBuilder->build("SUBSCRIBE", "+switch-master"));
134 }
135
136 void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
137                                                   const Reply& reply)
138 {
139     if (!error)
140     {
141         auto subscribeReply = parseSubscribeReply(reply, *logger);
142         if (subscribeReply)
143         {
144             switch (subscribeReply->type)
145             {
146                 case (SubscribeReply::Type::SUBSCRIBE_REPLY):
147                 {
148                     dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
149                     break;
150                 }
151                 case (SubscribeReply::Type::NOTIFICATION):
152                 {
153                     auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
154                     if (hostAndPort)
155                     {
156                         auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
157                                                        DatabaseInfo::Type::SINGLE,
158                                                        boost::none,
159                                                        DatabaseInfo::Discovery::SENTINEL}));
160                         if (stateChangedCb)
161                             stateChangedCb(databaseInfo);
162                     }
163                     else
164                         SHAREDDATALAYER_ABORT("Notification message parsing error.");
165                     break;
166                 }
167                 case (SubscribeReply::Type::UNKNOWN):
168                 {
169                     logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
170                     SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
171                 }
172             }
173         }
174         else
175             SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
176     }
177     else
178         subscribeRetryTimer.arm(
179                 subscribeRetryTimerDuration,
180                 std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
181 }
182
183 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
184 {
185     dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
186                                         this,
187                                         std::placeholders::_1,
188                                         std::placeholders::_2),
189                               "dummyNamespace", // Not meaningful for Sentinel
190                               contentsBuilder->build("SENTINEL", "get-master-addr-by-name", sentinelMasterName));
191 }
192
193 void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
194                                                       const Reply& reply)
195 {
196     if (!error)
197     {
198         auto hostAndPort = parseMasterInquiryReply(reply, *logger);
199         if (hostAndPort)
200         {
201             auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
202                                            DatabaseInfo::Type::SINGLE,
203                                            boost::none,
204                                            DatabaseInfo::Discovery::SENTINEL}));
205             if (stateChangedCb)
206                 stateChangedCb(databaseInfo);
207         }
208         else
209             SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
210     }
211     else
212     {
213         masterInquiryRetryTimer.arm(
214                 masterInquiryRetryTimerDuration,
215                 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
216     }
217 }
218
219 namespace
220 {
221     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
222                                                                           const DatabaseInfo& databaseInfo,
223                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
224                                                                           std::shared_ptr<Logger> logger,
225                                                                           bool usePermanentCommandCallbacks)
226     {
227         return AsyncCommandDispatcher::create(engine,
228                                               databaseInfo,
229                                               contentsBuilder,
230                                               usePermanentCommandCallbacks,
231                                               logger,
232                                               true);
233     }
234
235     std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
236     {
237         // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
238         auto replyType = reply.getType();
239         if (replyType == Reply::Type::ARRAY)
240         {
241             auto& replyVector(*reply.getArray());
242             auto firstElementType = replyVector[0]->getType();
243             if (firstElementType == Reply::Type::STRING)
244             {
245                 auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
246                 auto kind(replyVector[0]->getString()->str);
247                 if (kind == "subscribe")
248                 {
249                     subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
250                     return subscribeReply;
251                 }
252                 else if (kind == "message")
253                 {
254                     subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
255                     auto thirdElementType = replyVector[2]->getType();
256                     if (thirdElementType == Reply::Type::STRING)
257                     {
258                         subscribeReply->message = replyVector[2]->getString()->str;
259                         return subscribeReply;
260                     }
261                     else
262                         logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
263                 }
264                 else
265                     logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
266             }
267             else
268                 logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
269                                << static_cast<int>(firstElementType) << std::endl;
270         }
271         else
272             logger.debug() << "Invalid SUBSCRIBE reply type: "
273                            << static_cast<int>(replyType) << std::endl;
274         return nullptr;
275     }
276
277     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
278     {
279         auto replyType = reply.getType();
280         if (replyType == Reply::Type::ARRAY)
281         {
282             auto& replyVector(*reply.getArray());
283             auto hostElementType = replyVector[0]->getType();
284             if (hostElementType == Reply::Type::STRING)
285             {
286                 auto host(replyVector[0]->getString()->str);
287                 auto portElementType = replyVector[1]->getType();
288                 if (portElementType == Reply::Type::STRING)
289                 {
290                     auto port(replyVector[1]->getString()->str);
291                     try
292                     {
293                         return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
294                     }
295                     catch (const std::exception& e)
296                     {
297                         logger.debug() << "Invalid host or port in master inquiry reply, host: "
298                                        << host << ", port: " << port
299                                        << ", exception: " << e.what() << std::endl;
300                     }
301                 }
302                 else
303                     logger.debug() << "Invalid port element type in master inquiry reply: "
304                                    << static_cast<int>(portElementType) << std::endl;
305             }
306             else
307                 logger.debug() << "Invalid host element type in master inquiry reply: "
308                                << static_cast<int>(hostElementType) << std::endl;
309         }
310         else
311             logger.debug() << "Invalid master inquiry reply type: "
312                            << static_cast<int>(replyType) << std::endl;
313         return nullptr;
314     }
315
316     std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
317     {
318         std::vector<std::string> splittedMessage;
319         boost::split(splittedMessage, message, boost::is_any_of(" "));
320         if (splittedMessage.size() == 5)
321         {
322             auto host = splittedMessage[3];
323             auto port = splittedMessage[4];
324             try
325             {
326                 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
327             }
328             catch (const std::exception& e)
329             {
330                 logger.debug() << "Invalid host or port in notification message, host: "
331                                << host << ", port: " << port
332                                << ", exception: " << e.what() << std::endl;
333             }
334         }
335         else
336             logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;
337         return nullptr;
338     }
339 }