2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #include <arpa/inet.h>
18 #include <boost/algorithm/string.hpp>
22 #include <sdl/asyncstorage.hpp>
23 #include "private/abort.hpp"
24 #include "private/hostandport.hpp"
25 #include "private/redis/asyncsentineldatabasediscovery.hpp"
26 #include "private/redis/asynccommanddispatcher.hpp"
27 #include "private/redis/contents.hpp"
28 #include "private/redis/contentsbuilder.hpp"
29 #include "private/redis/reply.hpp"
31 using namespace shareddatalayer;
32 using namespace shareddatalayer::redis;
36 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
37 const DatabaseInfo& databaseInfo,
38 std::shared_ptr<ContentsBuilder> contentsBuilder,
39 std::shared_ptr<Logger> logger,
40 bool usePermanentCommandCallbacks);
44 enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
48 SubscribeReply(): type(Type::UNKNOWN) { }
51 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
53 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
55 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
58 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
59 std::shared_ptr<Logger> logger):
60 AsyncSentinelDatabaseDiscovery(engine,
62 ::asyncCommandDispatcherCreator,
63 std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
67 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
68 std::shared_ptr<Logger> logger,
69 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
70 std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
73 // @TODO Make configurable.
74 databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}),
75 DatabaseInfo::Type::SINGLE,
77 DatabaseInfo::Discovery::SENTINEL})),
78 contentsBuilder(contentsBuilder),
79 subscribeRetryTimer(*engine),
80 subscribeRetryTimerDuration(std::chrono::seconds(1)),
81 masterInquiryRetryTimer(*engine),
82 masterInquiryRetryTimerDuration(std::chrono::seconds(1))
84 subscriber = asyncCommandDispatcherCreator(*engine,
89 dispatcher = asyncCommandDispatcherCreator(*engine,
96 AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
99 subscriber->disableCommandCallbacks();
101 dispatcher->disableCommandCallbacks();
102 stateChangedCb = nullptr;
105 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
108 subscriber->registerDisconnectCb([this]()
110 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
112 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
115 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
117 stateChangedCb = nullptr;
120 void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
122 subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
124 std::placeholders::_1,
125 std::placeholders::_2),
126 "dummyNamespace", // Not meaningful for Sentinel
127 contentsBuilder->build("SUBSCRIBE", "+switch-master"));
130 void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
135 auto subscribeReply = parseSubscribeReply(reply, *logger);
138 switch (subscribeReply->type)
140 case (SubscribeReply::Type::SUBSCRIBE_REPLY):
142 dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
145 case (SubscribeReply::Type::NOTIFICATION):
147 auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
150 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
151 DatabaseInfo::Type::SINGLE,
153 DatabaseInfo::Discovery::SENTINEL}));
155 stateChangedCb(databaseInfo);
158 SHAREDDATALAYER_ABORT("Notification message parsing error.");
161 case (SubscribeReply::Type::UNKNOWN):
163 logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
164 SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
169 SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
172 subscribeRetryTimer.arm(
173 subscribeRetryTimerDuration,
174 std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
177 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
179 dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
181 std::placeholders::_1,
182 std::placeholders::_2),
183 "dummyNamespace", // Not meaningful for Sentinel
184 contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
187 void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
192 auto hostAndPort = parseMasterInquiryReply(reply, *logger);
195 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
196 DatabaseInfo::Type::SINGLE,
198 DatabaseInfo::Discovery::SENTINEL}));
200 stateChangedCb(databaseInfo);
203 SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
207 masterInquiryRetryTimer.arm(
208 masterInquiryRetryTimerDuration,
209 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
215 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
216 const DatabaseInfo& databaseInfo,
217 std::shared_ptr<ContentsBuilder> contentsBuilder,
218 std::shared_ptr<Logger> logger,
219 bool usePermanentCommandCallbacks)
221 return AsyncCommandDispatcher::create(engine,
224 usePermanentCommandCallbacks,
229 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
231 // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
232 auto replyType = reply.getType();
233 if (replyType == Reply::Type::ARRAY)
235 auto& replyVector(*reply.getArray());
236 auto firstElementType = replyVector[0]->getType();
237 if (firstElementType == Reply::Type::STRING)
239 auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
240 auto kind(replyVector[0]->getString()->str);
241 if (kind == "subscribe")
243 subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
244 return subscribeReply;
246 else if (kind == "message")
248 subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
249 auto thirdElementType = replyVector[2]->getType();
250 if (thirdElementType == Reply::Type::STRING)
252 subscribeReply->message = replyVector[2]->getString()->str;
253 return subscribeReply;
256 logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
259 logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
262 logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
263 << static_cast<int>(firstElementType) << std::endl;
266 logger.debug() << "Invalid SUBSCRIBE reply type: "
267 << static_cast<int>(replyType) << std::endl;
271 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
273 auto replyType = reply.getType();
274 if (replyType == Reply::Type::ARRAY)
276 auto& replyVector(*reply.getArray());
277 auto hostElementType = replyVector[0]->getType();
278 if (hostElementType == Reply::Type::STRING)
280 auto host(replyVector[0]->getString()->str);
281 auto portElementType = replyVector[1]->getType();
282 if (portElementType == Reply::Type::STRING)
284 auto port(replyVector[1]->getString()->str);
287 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
289 catch (const std::exception& e)
291 logger.debug() << "Invalid host or port in master inquiry reply, host: "
292 << host << ", port: " << port
293 << ", exception: " << e.what() << std::endl;
297 logger.debug() << "Invalid port element type in master inquiry reply: "
298 << static_cast<int>(portElementType) << std::endl;
301 logger.debug() << "Invalid host element type in master inquiry reply: "
302 << static_cast<int>(hostElementType) << std::endl;
305 logger.debug() << "Invalid master inquiry reply type: "
306 << static_cast<int>(replyType) << std::endl;
310 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
312 std::vector<std::string> splittedMessage;
313 boost::split(splittedMessage, message, boost::is_any_of(" "));
314 if (splittedMessage.size() == 5)
316 auto host = splittedMessage[3];
317 auto port = splittedMessage[4];
320 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
322 catch (const std::exception& e)
324 logger.debug() << "Invalid host or port in notification message, host: "
325 << host << ", port: " << port
326 << ", exception: " << e.what() << std::endl;
330 logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;