2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #include <arpa/inet.h>
18 #include <boost/algorithm/string.hpp>
22 #include <sdl/asyncstorage.hpp>
23 #include "private/abort.hpp"
24 #include "private/hostandport.hpp"
25 #include "private/redis/asyncsentineldatabasediscovery.hpp"
26 #include "private/redis/asynccommanddispatcher.hpp"
27 #include "private/redis/contents.hpp"
28 #include "private/redis/contentsbuilder.hpp"
29 #include "private/redis/reply.hpp"
31 using namespace shareddatalayer;
32 using namespace shareddatalayer::redis;
36 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
37 const DatabaseInfo& databaseInfo,
38 std::shared_ptr<ContentsBuilder> contentsBuilder,
39 std::shared_ptr<Logger> logger,
40 bool usePermanentCommandCallbacks);
44 enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
48 SubscribeReply(): type(Type::UNKNOWN) { }
51 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
53 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
55 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
58 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
59 std::shared_ptr<Logger> logger,
60 const HostAndPort& sentinelAddress,
61 const std::string& sentinelMasterName):
62 AsyncSentinelDatabaseDiscovery(engine,
66 ::asyncCommandDispatcherCreator,
67 std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
71 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
72 std::shared_ptr<Logger> logger,
73 const HostAndPort& sentinelAddress,
74 const std::string& sentinelMasterName,
75 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
76 std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
79 databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({sentinelAddress}),
80 DatabaseInfo::Type::SINGLE,
82 DatabaseInfo::Discovery::SENTINEL})),
83 sentinelMasterName(sentinelMasterName),
84 contentsBuilder(contentsBuilder),
85 subscribeRetryTimer(*engine),
86 subscribeRetryTimerDuration(std::chrono::seconds(1)),
87 masterInquiryRetryTimer(*engine),
88 masterInquiryRetryTimerDuration(std::chrono::seconds(1))
90 subscriber = asyncCommandDispatcherCreator(*engine,
95 dispatcher = asyncCommandDispatcherCreator(*engine,
102 AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
105 subscriber->disableCommandCallbacks();
107 dispatcher->disableCommandCallbacks();
108 stateChangedCb = nullptr;
111 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
114 subscriber->registerDisconnectCb([this]()
116 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
118 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
121 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
123 stateChangedCb = nullptr;
126 void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
128 subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
130 std::placeholders::_1,
131 std::placeholders::_2),
132 "dummyNamespace", // Not meaningful for Sentinel
133 contentsBuilder->build("SUBSCRIBE", "+switch-master"));
136 void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
141 auto subscribeReply = parseSubscribeReply(reply, *logger);
144 switch (subscribeReply->type)
146 case (SubscribeReply::Type::SUBSCRIBE_REPLY):
148 dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
151 case (SubscribeReply::Type::NOTIFICATION):
153 auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
156 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
157 DatabaseInfo::Type::SINGLE,
159 DatabaseInfo::Discovery::SENTINEL}));
161 stateChangedCb(databaseInfo);
164 SHAREDDATALAYER_ABORT("Notification message parsing error.");
167 case (SubscribeReply::Type::UNKNOWN):
169 logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
170 SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
175 SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
178 subscribeRetryTimer.arm(
179 subscribeRetryTimerDuration,
180 std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
183 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
185 dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
187 std::placeholders::_1,
188 std::placeholders::_2),
189 "dummyNamespace", // Not meaningful for Sentinel
190 contentsBuilder->build("SENTINEL", "get-master-addr-by-name", sentinelMasterName));
193 void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
198 auto hostAndPort = parseMasterInquiryReply(reply, *logger);
201 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
202 DatabaseInfo::Type::SINGLE,
204 DatabaseInfo::Discovery::SENTINEL}));
206 stateChangedCb(databaseInfo);
209 SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
213 masterInquiryRetryTimer.arm(
214 masterInquiryRetryTimerDuration,
215 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
221 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
222 const DatabaseInfo& databaseInfo,
223 std::shared_ptr<ContentsBuilder> contentsBuilder,
224 std::shared_ptr<Logger> logger,
225 bool usePermanentCommandCallbacks)
227 return AsyncCommandDispatcher::create(engine,
230 usePermanentCommandCallbacks,
235 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
237 // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
238 auto replyType = reply.getType();
239 if (replyType == Reply::Type::ARRAY)
241 auto& replyVector(*reply.getArray());
242 auto firstElementType = replyVector[0]->getType();
243 if (firstElementType == Reply::Type::STRING)
245 auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
246 auto kind(replyVector[0]->getString()->str);
247 if (kind == "subscribe")
249 subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
250 return subscribeReply;
252 else if (kind == "message")
254 subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
255 auto thirdElementType = replyVector[2]->getType();
256 if (thirdElementType == Reply::Type::STRING)
258 subscribeReply->message = replyVector[2]->getString()->str;
259 return subscribeReply;
262 logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
265 logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
268 logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
269 << static_cast<int>(firstElementType) << std::endl;
272 logger.debug() << "Invalid SUBSCRIBE reply type: "
273 << static_cast<int>(replyType) << std::endl;
277 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
279 auto replyType = reply.getType();
280 if (replyType == Reply::Type::ARRAY)
282 auto& replyVector(*reply.getArray());
283 auto hostElementType = replyVector[0]->getType();
284 if (hostElementType == Reply::Type::STRING)
286 auto host(replyVector[0]->getString()->str);
287 auto portElementType = replyVector[1]->getType();
288 if (portElementType == Reply::Type::STRING)
290 auto port(replyVector[1]->getString()->str);
293 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
295 catch (const std::exception& e)
297 logger.debug() << "Invalid host or port in master inquiry reply, host: "
298 << host << ", port: " << port
299 << ", exception: " << e.what() << std::endl;
303 logger.debug() << "Invalid port element type in master inquiry reply: "
304 << static_cast<int>(portElementType) << std::endl;
307 logger.debug() << "Invalid host element type in master inquiry reply: "
308 << static_cast<int>(hostElementType) << std::endl;
311 logger.debug() << "Invalid master inquiry reply type: "
312 << static_cast<int>(replyType) << std::endl;
316 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
318 std::vector<std::string> splittedMessage;
319 boost::split(splittedMessage, message, boost::is_any_of(" "));
320 if (splittedMessage.size() == 5)
322 auto host = splittedMessage[3];
323 auto port = splittedMessage[4];
326 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
328 catch (const std::exception& e)
330 logger.debug() << "Invalid host or port in notification message, host: "
331 << host << ", port: " << port
332 << ", exception: " << e.what() << std::endl;
336 logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;