05815e24e53e5a8925a999554aa7479a5f941178
[ric-plt/sdl.git] / src / redis / asyncsentineldatabasediscovery.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include <arpa/inet.h>
18 #include <iostream>
19 #include <string>
20 #include <sdl/asyncstorage.hpp>
21 #include "private/abort.hpp"
22 #include "private/hostandport.hpp"
23 #include "private/redis/asyncsentineldatabasediscovery.hpp"
24 #include "private/redis/asynccommanddispatcher.hpp"
25 #include "private/redis/contents.hpp"
26 #include "private/redis/contentsbuilder.hpp"
27 #include "private/redis/reply.hpp"
28
29 using namespace shareddatalayer;
30 using namespace shareddatalayer::redis;
31
32 namespace
33 {
34     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
35                                                                           const DatabaseInfo& databaseInfo,
36                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
37                                                                           std::shared_ptr<Logger> logger);
38
39     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
40 }
41
42 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
43                                                                std::shared_ptr<Logger> logger):
44         AsyncSentinelDatabaseDiscovery(engine,
45                                        logger,
46                                        ::asyncCommandDispatcherCreator,
47                                        std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
48 {
49 }
50
51 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
52                                                                std::shared_ptr<Logger> logger,
53                                                                const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
54                                                                std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
55         engine(engine),
56         logger(logger),
57         // @TODO Make configurable.
58         databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}),
59                      DatabaseInfo::Type::SINGLE,
60                      boost::none,
61                      DatabaseInfo::Discovery::SENTINEL})),
62         contentsBuilder(contentsBuilder),
63         masterInquiryRetryTimer(*engine),
64         masterInquiryRetryTimerDuration(std::chrono::seconds(1))
65 {
66     dispatcher = asyncCommandDispatcherCreator(*engine,
67                                                databaseInfo,
68                                                contentsBuilder,
69                                                logger);
70 }
71
72 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
73 {
74     stateChangedCb = cb;
75     dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
76 }
77
78 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
79 {
80     stateChangedCb = nullptr;
81 }
82
83 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
84 {
85     dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
86                                         this,
87                                         std::placeholders::_1,
88                                         std::placeholders::_2),
89                               "dummyNamespace", // Not meaningful for SENTINEL commands
90                               contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
91 }
92
93 void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
94                                                       const Reply& reply)
95 {
96     if (!error)
97     {
98         auto hostAndPort = parseMasterInquiryReply(reply, *logger);
99         if (hostAndPort)
100         {
101             auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
102                                            DatabaseInfo::Type::SINGLE,
103                                            boost::none,
104                                            DatabaseInfo::Discovery::SENTINEL}));
105             if (stateChangedCb)
106                 stateChangedCb(databaseInfo);
107         }
108         else
109             SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
110     }
111     else
112     {
113         masterInquiryRetryTimer.arm(
114                 masterInquiryRetryTimerDuration,
115                 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
116     }
117 }
118
119 namespace
120 {
121     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
122                                                                           const DatabaseInfo& databaseInfo,
123                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
124                                                                           std::shared_ptr<Logger> logger)
125     {
126         return AsyncCommandDispatcher::create(engine,
127                                               databaseInfo,
128                                               contentsBuilder,
129                                               false,
130                                               logger,
131                                               true);
132     }
133
134     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
135     {
136         auto replyType = reply.getType();
137         if (replyType == Reply::Type::ARRAY)
138         {
139             auto& replyVector(*reply.getArray());
140             auto hostElementType = replyVector[0]->getType();
141             if (hostElementType == Reply::Type::STRING)
142             {
143                 auto host(replyVector[0]->getString()->str);
144                 auto portElementType = replyVector[1]->getType();
145                 if (portElementType == Reply::Type::STRING)
146                 {
147                     auto port(replyVector[1]->getString()->str);
148                     try
149                     {
150                         return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
151                     }
152                     catch (const std::exception& e)
153                     {
154                         logger.debug() << "Invalid host or port in master inquiry reply, host: "
155                                        << host << ", port: " << port
156                                        << ", exception: " << e.what() << std::endl;
157                     }
158                 }
159                 else
160                     logger.debug() << "Invalid port element type in master inquiry reply: "
161                                    << static_cast<int>(portElementType) << std::endl;
162             }
163             else
164                 logger.debug() << "Invalid host element type in master inquiry reply: "
165                                << static_cast<int>(hostElementType) << std::endl;
166         }
167         else
168             logger.debug() << "Invalid master inquiry reply type: "
169                            << static_cast<int>(replyType) << std::endl;
170         return nullptr;
171     }
172 }