/* Copyright (c) 2018-2019 Nokia. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include #include #include #include #include "private/abort.hpp" #include "private/hostandport.hpp" #include "private/redis/asyncsentineldatabasediscovery.hpp" #include "private/redis/asynccommanddispatcher.hpp" #include "private/redis/contents.hpp" #include "private/redis/contentsbuilder.hpp" #include "private/redis/reply.hpp" using namespace shareddatalayer; using namespace shareddatalayer::redis; namespace { std::shared_ptr asyncCommandDispatcherCreator(Engine& engine, const DatabaseInfo& databaseInfo, std::shared_ptr contentsBuilder, std::shared_ptr logger); std::unique_ptr parseMasterInquiryReply(const Reply& reply, Logger& logger); } AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr engine, std::shared_ptr logger): AsyncSentinelDatabaseDiscovery(engine, logger, ::asyncCommandDispatcherCreator, std::make_shared(AsyncStorage::SEPARATOR)) { } AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr engine, std::shared_ptr logger, const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator, std::shared_ptr contentsBuilder): engine(engine), logger(logger), // @TODO Make configurable. databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}), DatabaseInfo::Type::SINGLE, boost::none, DatabaseInfo::Discovery::SENTINEL})), contentsBuilder(contentsBuilder), masterInquiryRetryTimer(*engine), masterInquiryRetryTimerDuration(std::chrono::seconds(1)) { dispatcher = asyncCommandDispatcherCreator(*engine, databaseInfo, contentsBuilder, logger); } void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb) { stateChangedCb = cb; dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this)); } void AsyncSentinelDatabaseDiscovery::clearStateChangedCb() { stateChangedCb = nullptr; } void AsyncSentinelDatabaseDiscovery::sendMasterInquiry() { dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck, this, std::placeholders::_1, std::placeholders::_2), "dummyNamespace", // Not meaningful for SENTINEL commands contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable } void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error, const Reply& reply) { if (!error) { auto hostAndPort = parseMasterInquiryReply(reply, *logger); if (hostAndPort) { auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}), DatabaseInfo::Type::SINGLE, boost::none, DatabaseInfo::Discovery::SENTINEL})); if (stateChangedCb) stateChangedCb(databaseInfo); } else SHAREDDATALAYER_ABORT("Master inquiry reply parsing error."); } else { masterInquiryRetryTimer.arm( masterInquiryRetryTimerDuration, std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this)); } } namespace { std::shared_ptr asyncCommandDispatcherCreator(Engine& engine, const DatabaseInfo& databaseInfo, std::shared_ptr contentsBuilder, std::shared_ptr logger) { return AsyncCommandDispatcher::create(engine, databaseInfo, contentsBuilder, false, logger, true); } std::unique_ptr parseMasterInquiryReply(const Reply& reply, Logger& logger) { auto replyType = reply.getType(); if (replyType == Reply::Type::ARRAY) { auto& replyVector(*reply.getArray()); auto hostElementType = replyVector[0]->getType(); if (hostElementType == Reply::Type::STRING) { auto host(replyVector[0]->getString()->str); auto portElementType = replyVector[1]->getType(); if (portElementType == Reply::Type::STRING) { auto port(replyVector[1]->getString()->str); try { return std::unique_ptr(new HostAndPort(host+":"+port, 0));; } catch (const std::exception& e) { logger.debug() << "Invalid host or port in master inquiry reply, host: " << host << ", port: " << port << ", exception: " << e.what() << std::endl; } } else logger.debug() << "Invalid port element type in master inquiry reply: " << static_cast(portElementType) << std::endl; } else logger.debug() << "Invalid host element type in master inquiry reply: " << static_cast(hostElementType) << std::endl; } else logger.debug() << "Invalid master inquiry reply type: " << static_cast(replyType) << std::endl; return nullptr; } }