Add Sentinel change notification handling logic
[ric-plt/sdl.git] / src / redis / asyncsentineldatabasediscovery.cpp
index 05815e2..be51c5e 100644 (file)
 */
 
 #include <arpa/inet.h>
+#include <boost/algorithm/string.hpp>
 #include <iostream>
 #include <string>
+#include <vector>
 #include <sdl/asyncstorage.hpp>
 #include "private/abort.hpp"
 #include "private/hostandport.hpp"
@@ -34,9 +36,23 @@ namespace
     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
                                                                           const DatabaseInfo& databaseInfo,
                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
-                                                                          std::shared_ptr<Logger> logger);
+                                                                          std::shared_ptr<Logger> logger,
+                                                                          bool usePermanentCommandCallbacks);
+
+    struct SubscribeReply
+    {
+        enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
+        Type type;
+        std::string message;
+
+        SubscribeReply(): type(Type::UNKNOWN) { }
+    };
+
+    std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
 
     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
+
+    std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
 }
 
 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
@@ -60,19 +76,40 @@ AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<E
                      boost::none,
                      DatabaseInfo::Discovery::SENTINEL})),
         contentsBuilder(contentsBuilder),
+        subscribeRetryTimer(*engine),
+        subscribeRetryTimerDuration(std::chrono::seconds(1)),
         masterInquiryRetryTimer(*engine),
         masterInquiryRetryTimerDuration(std::chrono::seconds(1))
 {
+    subscriber = asyncCommandDispatcherCreator(*engine,
+                                               databaseInfo,
+                                               contentsBuilder,
+                                               logger,
+                                               true);
     dispatcher = asyncCommandDispatcherCreator(*engine,
                                                databaseInfo,
                                                contentsBuilder,
-                                               logger);
+                                               logger,
+                                               false);
+}
+
+AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
+{
+    if (subscriber)
+        subscriber->disableCommandCallbacks();
+    if (dispatcher)
+        dispatcher->disableCommandCallbacks();
+    stateChangedCb = nullptr;
 }
 
 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
 {
     stateChangedCb = cb;
-    dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+    subscriber->registerDisconnectCb([this]()
+            {
+                subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
+            });
+    subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
 }
 
 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
@@ -80,13 +117,70 @@ void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
     stateChangedCb = nullptr;
 }
 
+void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
+{
+    subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              "dummyNamespace", // Not meaningful for Sentinel
+                              contentsBuilder->build("SUBSCRIBE", "+switch-master"));
+}
+
+void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
+                                                  const Reply& reply)
+{
+    if (!error)
+    {
+        auto subscribeReply = parseSubscribeReply(reply, *logger);
+        if (subscribeReply)
+        {
+            switch (subscribeReply->type)
+            {
+                case (SubscribeReply::Type::SUBSCRIBE_REPLY):
+                {
+                    dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+                    break;
+                }
+                case (SubscribeReply::Type::NOTIFICATION):
+                {
+                    auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
+                    if (hostAndPort)
+                    {
+                        auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
+                                                       DatabaseInfo::Type::SINGLE,
+                                                       boost::none,
+                                                       DatabaseInfo::Discovery::SENTINEL}));
+                        if (stateChangedCb)
+                            stateChangedCb(databaseInfo);
+                    }
+                    else
+                        SHAREDDATALAYER_ABORT("Notification message parsing error.");
+                    break;
+                }
+                case (SubscribeReply::Type::UNKNOWN):
+                {
+                    logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
+                    SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
+                }
+            }
+        }
+        else
+            SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
+    }
+    else
+        subscribeRetryTimer.arm(
+                subscribeRetryTimerDuration,
+                std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
+}
+
 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
 {
     dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
                                         this,
                                         std::placeholders::_1,
                                         std::placeholders::_2),
-                              "dummyNamespace", // Not meaningful for SENTINEL commands
+                              "dummyNamespace", // Not meaningful for Sentinel
                               contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
 }
 
@@ -121,16 +215,59 @@ namespace
     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
                                                                           const DatabaseInfo& databaseInfo,
                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
-                                                                          std::shared_ptr<Logger> logger)
+                                                                          std::shared_ptr<Logger> logger,
+                                                                          bool usePermanentCommandCallbacks)
     {
         return AsyncCommandDispatcher::create(engine,
                                               databaseInfo,
                                               contentsBuilder,
-                                              false,
+                                              usePermanentCommandCallbacks,
                                               logger,
                                               true);
     }
 
+    std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
+    {
+        // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
+        auto replyType = reply.getType();
+        if (replyType == Reply::Type::ARRAY)
+        {
+            auto& replyVector(*reply.getArray());
+            auto firstElementType = replyVector[0]->getType();
+            if (firstElementType == Reply::Type::STRING)
+            {
+                auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
+                auto kind(replyVector[0]->getString()->str);
+                if (kind == "subscribe")
+                {
+                    subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
+                    return subscribeReply;
+                }
+                else if (kind == "message")
+                {
+                    subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
+                    auto thirdElementType = replyVector[2]->getType();
+                    if (thirdElementType == Reply::Type::STRING)
+                    {
+                        subscribeReply->message = replyVector[2]->getString()->str;
+                        return subscribeReply;
+                    }
+                    else
+                        logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
+                }
+                else
+                    logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
+            }
+            else
+                logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
+                               << static_cast<int>(firstElementType) << std::endl;
+        }
+        else
+            logger.debug() << "Invalid SUBSCRIBE reply type: "
+                           << static_cast<int>(replyType) << std::endl;
+        return nullptr;
+    }
+
     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
     {
         auto replyType = reply.getType();
@@ -169,4 +306,28 @@ namespace
                            << static_cast<int>(replyType) << std::endl;
         return nullptr;
     }
+
+    std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
+    {
+        std::vector<std::string> splittedMessage;
+        boost::split(splittedMessage, message, boost::is_any_of(" "));
+        if (splittedMessage.size() == 5)
+        {
+            auto host = splittedMessage[3];
+            auto port = splittedMessage[4];
+            try
+            {
+                return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
+            }
+            catch (const std::exception& e)
+            {
+                logger.debug() << "Invalid host or port in notification message, host: "
+                               << host << ", port: " << port
+                               << ", exception: " << e.what() << std::endl;
+            }
+        }
+        else
+            logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;
+        return nullptr;
+    }
 }