Multiple DBAAS Redis standalone groups
[ric-plt/sdl.git] / src / asyncstorageimpl.cpp
index b17fcbd..a018c51 100644 (file)
    limitations under the License.
 */
 
+/*
+ * This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ * platform project (RICP).
+*/
+
 #include "config.h"
 #include "private/error.hpp"
 #include "private/abort.hpp"
 #include "private/engine.hpp"
 #include "private/logger.hpp"
 #if HAVE_REDIS
-#include "private/redis/asyncdatabasediscovery.hpp"
 #include "private/redis/asyncredisstorage.hpp"
 #endif
 
+#include <boost/optional/optional_io.hpp>
+#include <boost/crc.hpp>
+
 using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+        std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
+                                                                              const std::string& ns,
+                                                                              const DatabaseConfiguration& databaseConfiguration,
+                                                                              const boost::optional<std::size_t>& addressIndex,
+                                                                              std::shared_ptr<Logger> logger)
+        {
+            return AsyncDatabaseDiscovery::create(engine,
+                                                  ns,
+                                                  databaseConfiguration,
+                                                  addressIndex,
+                                                  logger);
+        }
+
+        std::uint32_t crc32(const std::string& s)
+        {
+           boost::crc_32_type result;
+           result.process_bytes(s.data(), s.size());
+           return result.checksum();
+        }
+
+        std::uint32_t getClusterHashIndex(const std::string& s, const size_t count)
+        {
+            return crc32(s)%count;
+        }
+}
 
 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
                                    const boost::optional<PublisherId>& pId,
@@ -36,7 +72,8 @@ AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
     databaseConfiguration(std::make_shared<DatabaseConfigurationImpl>()),
     namespaceConfigurations(std::make_shared<NamespaceConfigurationsImpl>()),
     publisherId(pId),
-    logger(logger)
+    logger(logger),
+    asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator)
 {
     ConfigurationReader configurationReader(logger);
     configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration));
@@ -48,29 +85,73 @@ AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
                                    const boost::optional<PublisherId>& pId,
                                    std::shared_ptr<DatabaseConfiguration> databaseConfiguration,
                                    std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
-                                   std::shared_ptr<Logger> logger):
+                                   std::shared_ptr<Logger> logger,
+                                   const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator):
     engine(engine),
     databaseConfiguration(databaseConfiguration),
     namespaceConfigurations(namespaceConfigurations),
     publisherId(pId),
-    logger(logger)
-{
-}
-
-AsyncStorage& AsyncStorageImpl::getRedisHandler()
+    logger(logger),
+    asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator)
+{
+}
+
+void AsyncStorageImpl::setAsyncRedisStorageHandlersForCluster(const std::string& ns)
+{
+    static auto serverCount = databaseConfiguration->getServerAddresses().size();
+    for (std::size_t addrIndex = 0; addrIndex < serverCount; addrIndex++)
+    {
+        auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
+                                                                asyncDatabaseDiscoveryCreator(
+                                                                        engine,
+                                                                        ns,
+                                                                        std::ref(*databaseConfiguration),
+                                                                        addrIndex,
+                                                                        logger),
+                                                                publisherId,
+                                                                namespaceConfigurations,
+                                                                logger);
+        asyncStorages.push_back(redisHandler);
+    }
+}
+
+void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns)
+{
+    if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
+        DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
+    {
+            setAsyncRedisStorageHandlersForCluster(ns);
+            return;
+    }
+    auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
+                                                            asyncDatabaseDiscoveryCreator(
+                                                                    engine,
+                                                                    ns,
+                                                                    std::ref(*databaseConfiguration),
+                                                                    boost::none,
+                                                                    logger),
+                                                            publisherId,
+                                                            namespaceConfigurations,
+                                                            logger);
+    asyncStorages.push_back(redisHandler);
+}
+
+AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns)
+{
+    std::size_t handlerIndex{0};
+    if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
+        DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
+        handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size());
+    return *asyncStorages.at(handlerIndex);
+}
+
+AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns)
 {
 #if HAVE_REDIS
-    static AsyncRedisStorage redisHandler{engine,
-                                          redis::AsyncDatabaseDiscovery::create(
-                                              engine,
-                                              boost::none,
-                                              std::ref(*databaseConfiguration),
-                                              logger),
-                                          publisherId,
-                                          namespaceConfigurations,
-                                          logger};
-
-    return redisHandler;
+    if (asyncStorages.empty())
+            setAsyncRedisStorageHandlers(ns);
+
+    return getAsyncRedisStorageHandler(ns);
 #else
     logger->error() << "Redis operations cannot be performed, Redis not enabled";
     SHAREDDATALAYER_ABORT("Invalid configuration.");
@@ -86,7 +167,7 @@ AsyncStorage& AsyncStorageImpl::getDummyHandler()
 AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns)
 {
     if (namespaceConfigurations->isDbBackendUseEnabled(ns))
-        return getRedisHandler();
+        return getRedisHandler(ns);
 
     return getDummyHandler();
 }