2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #include <arpa/inet.h>
20 #include <sdl/asyncstorage.hpp>
21 #include "private/abort.hpp"
22 #include "private/hostandport.hpp"
23 #include "private/redis/asyncsentineldatabasediscovery.hpp"
24 #include "private/redis/asynccommanddispatcher.hpp"
25 #include "private/redis/contents.hpp"
26 #include "private/redis/contentsbuilder.hpp"
27 #include "private/redis/reply.hpp"
29 using namespace shareddatalayer;
30 using namespace shareddatalayer::redis;
34 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
35 const DatabaseInfo& databaseInfo,
36 std::shared_ptr<ContentsBuilder> contentsBuilder,
37 std::shared_ptr<Logger> logger);
39 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
42 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
43 std::shared_ptr<Logger> logger):
44 AsyncSentinelDatabaseDiscovery(engine,
46 ::asyncCommandDispatcherCreator,
47 std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
51 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
52 std::shared_ptr<Logger> logger,
53 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
54 std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
57 // @TODO Make configurable.
58 databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}),
59 DatabaseInfo::Type::SINGLE,
61 DatabaseInfo::Discovery::SENTINEL})),
62 contentsBuilder(contentsBuilder),
63 masterInquiryRetryTimer(*engine),
64 masterInquiryRetryTimerDuration(std::chrono::seconds(1))
66 dispatcher = asyncCommandDispatcherCreator(*engine,
72 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
75 dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
78 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
80 stateChangedCb = nullptr;
83 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
85 dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
87 std::placeholders::_1,
88 std::placeholders::_2),
89 "dummyNamespace", // Not meaningful for SENTINEL commands
90 contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
93 void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
98 auto hostAndPort = parseMasterInquiryReply(reply, *logger);
101 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
102 DatabaseInfo::Type::SINGLE,
104 DatabaseInfo::Discovery::SENTINEL}));
106 stateChangedCb(databaseInfo);
109 SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
113 masterInquiryRetryTimer.arm(
114 masterInquiryRetryTimerDuration,
115 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
121 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
122 const DatabaseInfo& databaseInfo,
123 std::shared_ptr<ContentsBuilder> contentsBuilder,
124 std::shared_ptr<Logger> logger)
126 return AsyncCommandDispatcher::create(engine,
134 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
136 auto replyType = reply.getType();
137 if (replyType == Reply::Type::ARRAY)
139 auto& replyVector(*reply.getArray());
140 auto hostElementType = replyVector[0]->getType();
141 if (hostElementType == Reply::Type::STRING)
143 auto host(replyVector[0]->getString()->str);
144 auto portElementType = replyVector[1]->getType();
145 if (portElementType == Reply::Type::STRING)
147 auto port(replyVector[1]->getString()->str);
150 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
152 catch (const std::exception& e)
154 logger.debug() << "Invalid host or port in master inquiry reply, host: "
155 << host << ", port: " << port
156 << ", exception: " << e.what() << std::endl;
160 logger.debug() << "Invalid port element type in master inquiry reply: "
161 << static_cast<int>(portElementType) << std::endl;
164 logger.debug() << "Invalid host element type in master inquiry reply: "
165 << static_cast<int>(hostElementType) << std::endl;
168 logger.debug() << "Invalid master inquiry reply type: "
169 << static_cast<int>(replyType) << std::endl;