Multiple DBAAS Redis Sentinel groups
[ric-plt/sdl.git] / src / asyncstorageimpl.cpp
index 31ac713..d9478c2 100644 (file)
 #include "private/redis/asyncredisstorage.hpp"
 #endif
 
+#include <boost/optional/optional_io.hpp>
+#include <boost/crc.hpp>
+
 using namespace shareddatalayer;
 using namespace shareddatalayer::redis;
 
 namespace
 {
         std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
+                                                                              const std::string& ns,
                                                                               const DatabaseConfiguration& databaseConfiguration,
+                                                                              const boost::optional<std::size_t>& addressIndex,
                                                                               std::shared_ptr<Logger> logger)
         {
             return AsyncDatabaseDiscovery::create(engine,
-                                                  boost::none,
+                                                  ns,
                                                   databaseConfiguration,
+                                                  addressIndex,
                                                   logger);
         }
+
+        std::uint32_t crc32(const std::string& s)
+        {
+           boost::crc_32_type result;
+           result.process_bytes(s.data(), s.size());
+           return result.checksum();
+        }
+
+        std::uint32_t getClusterHashIndex(const std::string& s, const size_t count)
+        {
+            return crc32(s)%count;
+        }
 }
 
 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
@@ -78,19 +96,40 @@ AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
 {
 }
 
-AsyncStorage& AsyncStorageImpl::getRedisHandler()
+void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns)
+{
+    for (std::size_t i = 0; i < databaseConfiguration->getServerAddresses().size(); i++)
+    {
+        auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
+                                                                asyncDatabaseDiscoveryCreator(
+                                                                        engine,
+                                                                        ns,
+                                                                        std::ref(*databaseConfiguration),
+                                                                        i,
+                                                                        logger),
+                                                                publisherId,
+                                                                namespaceConfigurations,
+                                                                logger);
+        asyncStorages.push_back(redisHandler);
+    }
+}
+
+AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns)
+{
+    std::size_t handlerIndex{0};
+    if (DatabaseConfiguration::DbType::SDL_CLUSTER == databaseConfiguration->getDbType())
+        handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size());
+    return *asyncStorages.at(handlerIndex);
+}
+
+AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns)
 {
 #if HAVE_REDIS
-    static AsyncRedisStorage redisHandler{engine,
-                                          asyncDatabaseDiscoveryCreator(
-                                              engine,
-                                              std::ref(*databaseConfiguration),
-                                              logger),
-                                          publisherId,
-                                          namespaceConfigurations,
-                                          logger};
-
-    return redisHandler;
+    auto serverAddresses(databaseConfiguration->getServerAddresses());
+    if (asyncStorages.empty())
+            setAsyncRedisStorageHandlers(ns);
+
+    return getAsyncRedisStorageHandler(ns);
 #else
     logger->error() << "Redis operations cannot be performed, Redis not enabled";
     SHAREDDATALAYER_ABORT("Invalid configuration.");
@@ -106,7 +145,7 @@ AsyncStorage& AsyncStorageImpl::getDummyHandler()
 AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns)
 {
     if (namespaceConfigurations->isDbBackendUseEnabled(ns))
-        return getRedisHandler();
+        return getRedisHandler(ns);
 
     return getDummyHandler();
 }