2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
22 #include <arpa/inet.h>
23 #include <boost/algorithm/string.hpp>
27 #include <sdl/asyncstorage.hpp>
28 #include "private/abort.hpp"
29 #include "private/hostandport.hpp"
30 #include "private/redis/asyncsentineldatabasediscovery.hpp"
31 #include "private/redis/asynccommanddispatcher.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/contentsbuilder.hpp"
34 #include "private/redis/reply.hpp"
36 using namespace shareddatalayer;
37 using namespace shareddatalayer::redis;
41 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
42 const DatabaseInfo& databaseInfo,
43 std::shared_ptr<ContentsBuilder> contentsBuilder,
44 std::shared_ptr<Logger> logger,
45 bool usePermanentCommandCallbacks);
49 enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
53 SubscribeReply(): type(Type::UNKNOWN) { }
56 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
58 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
60 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
63 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
64 std::shared_ptr<Logger> logger,
65 const HostAndPort& sentinelAddress,
66 const std::string& sentinelMasterName):
67 AsyncSentinelDatabaseDiscovery(engine,
71 ::asyncCommandDispatcherCreator,
72 std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
76 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
77 std::shared_ptr<Logger> logger,
78 const HostAndPort& sentinelAddress,
79 const std::string& sentinelMasterName,
80 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
81 std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
84 databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({sentinelAddress}),
85 DatabaseInfo::Type::SINGLE,
87 DatabaseInfo::Discovery::SENTINEL})),
88 sentinelMasterName(sentinelMasterName),
89 contentsBuilder(contentsBuilder),
90 subscribeRetryTimer(*engine),
91 subscribeRetryTimerDuration(std::chrono::seconds(1)),
92 masterInquiryRetryTimer(*engine),
93 masterInquiryRetryTimerDuration(std::chrono::seconds(1))
95 subscriber = asyncCommandDispatcherCreator(*engine,
100 dispatcher = asyncCommandDispatcherCreator(*engine,
107 AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
110 subscriber->disableCommandCallbacks();
112 dispatcher->disableCommandCallbacks();
113 stateChangedCb = nullptr;
116 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
119 subscriber->registerDisconnectCb([this]()
121 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
123 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
126 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
128 stateChangedCb = nullptr;
131 void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
133 subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
135 std::placeholders::_1,
136 std::placeholders::_2),
137 "dummyNamespace", // Not meaningful for Sentinel
138 contentsBuilder->build("SUBSCRIBE", "+switch-master"));
141 void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
146 auto subscribeReply = parseSubscribeReply(reply, *logger);
149 switch (subscribeReply->type)
151 case (SubscribeReply::Type::SUBSCRIBE_REPLY):
153 dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
156 case (SubscribeReply::Type::NOTIFICATION):
158 auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
161 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
162 DatabaseInfo::Type::SINGLE,
164 DatabaseInfo::Discovery::SENTINEL}));
166 stateChangedCb(databaseInfo);
169 SHAREDDATALAYER_ABORT("Notification message parsing error.");
172 case (SubscribeReply::Type::UNKNOWN):
174 logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
175 SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
180 SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
183 subscribeRetryTimer.arm(
184 subscribeRetryTimerDuration,
185 std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
188 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
190 dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
192 std::placeholders::_1,
193 std::placeholders::_2),
194 "dummyNamespace", // Not meaningful for Sentinel
195 contentsBuilder->build("SENTINEL", "get-master-addr-by-name", sentinelMasterName));
198 void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
203 auto hostAndPort = parseMasterInquiryReply(reply, *logger);
206 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
207 DatabaseInfo::Type::SINGLE,
209 DatabaseInfo::Discovery::SENTINEL}));
211 stateChangedCb(databaseInfo);
214 SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
218 masterInquiryRetryTimer.arm(
219 masterInquiryRetryTimerDuration,
220 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
226 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
227 const DatabaseInfo& databaseInfo,
228 std::shared_ptr<ContentsBuilder> contentsBuilder,
229 std::shared_ptr<Logger> logger,
230 bool usePermanentCommandCallbacks)
232 return AsyncCommandDispatcher::create(engine,
235 usePermanentCommandCallbacks,
240 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
242 // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
243 auto replyType = reply.getType();
244 if (replyType == Reply::Type::ARRAY)
246 auto& replyVector(*reply.getArray());
247 auto firstElementType = replyVector[0]->getType();
248 if (firstElementType == Reply::Type::STRING)
250 auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
251 auto kind(replyVector[0]->getString()->str);
252 if (kind == "subscribe")
254 subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
255 return subscribeReply;
257 else if (kind == "message")
259 subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
260 auto thirdElementType = replyVector[2]->getType();
261 if (thirdElementType == Reply::Type::STRING)
263 subscribeReply->message = replyVector[2]->getString()->str;
264 return subscribeReply;
267 logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
270 logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
273 logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
274 << static_cast<int>(firstElementType) << std::endl;
277 logger.debug() << "Invalid SUBSCRIBE reply type: "
278 << static_cast<int>(replyType) << std::endl;
282 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
284 auto replyType = reply.getType();
285 if (replyType == Reply::Type::ARRAY)
287 auto& replyVector(*reply.getArray());
288 auto hostElementType = replyVector[0]->getType();
289 if (hostElementType == Reply::Type::STRING)
291 auto host(replyVector[0]->getString()->str);
292 auto portElementType = replyVector[1]->getType();
293 if (portElementType == Reply::Type::STRING)
295 auto port(replyVector[1]->getString()->str);
298 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
300 catch (const std::exception& e)
302 logger.debug() << "Invalid host or port in master inquiry reply, host: "
303 << host << ", port: " << port
304 << ", exception: " << e.what() << std::endl;
308 logger.debug() << "Invalid port element type in master inquiry reply: "
309 << static_cast<int>(portElementType) << std::endl;
312 logger.debug() << "Invalid host element type in master inquiry reply: "
313 << static_cast<int>(hostElementType) << std::endl;
316 logger.debug() << "Invalid master inquiry reply type: "
317 << static_cast<int>(replyType) << std::endl;
321 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
323 std::vector<std::string> splittedMessage;
324 boost::split(splittedMessage, message, boost::is_any_of(" "));
325 if (splittedMessage.size() == 5)
327 auto host = splittedMessage[3];
328 auto port = splittedMessage[4];
331 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
333 catch (const std::exception& e)
335 logger.debug() << "Invalid host or port in notification message, host: "
336 << host << ", port: " << port
337 << ", exception: " << e.what() << std::endl;
341 logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;