X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fasyncstorageimpl.cpp;h=a018c51b3639f591903e1160a2b46f44a17530ae;hb=69b0a71f5fe8825fa45ee9502a41fd8b465c44e0;hp=b17fcbd0ee7c60c86c145c35b05673d853421232;hpb=ef2bf51d04aaf01fa0cabdcaf905b23423067662;p=ric-plt%2Fsdl.git diff --git a/src/asyncstorageimpl.cpp b/src/asyncstorageimpl.cpp index b17fcbd..a018c51 100644 --- a/src/asyncstorageimpl.cpp +++ b/src/asyncstorageimpl.cpp @@ -14,6 +14,11 @@ limitations under the License. */ +/* + * This source code is part of the near-RT RIC (RAN Intelligent Controller) + * platform project (RICP). +*/ + #include "config.h" #include "private/error.hpp" #include "private/abort.hpp" @@ -23,11 +28,42 @@ #include "private/engine.hpp" #include "private/logger.hpp" #if HAVE_REDIS -#include "private/redis/asyncdatabasediscovery.hpp" #include "private/redis/asyncredisstorage.hpp" #endif +#include +#include + using namespace shareddatalayer; +using namespace shareddatalayer::redis; + +namespace +{ + std::shared_ptr asyncDatabaseDiscoveryCreator(std::shared_ptr engine, + const std::string& ns, + const DatabaseConfiguration& databaseConfiguration, + const boost::optional& addressIndex, + std::shared_ptr logger) + { + return AsyncDatabaseDiscovery::create(engine, + ns, + databaseConfiguration, + addressIndex, + logger); + } + + std::uint32_t crc32(const std::string& s) + { + boost::crc_32_type result; + result.process_bytes(s.data(), s.size()); + return result.checksum(); + } + + std::uint32_t getClusterHashIndex(const std::string& s, const size_t count) + { + return crc32(s)%count; + } +} AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr engine, const boost::optional& pId, @@ -36,7 +72,8 @@ AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr engine, databaseConfiguration(std::make_shared()), namespaceConfigurations(std::make_shared()), publisherId(pId), - logger(logger) + logger(logger), + asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator) { ConfigurationReader configurationReader(logger); configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration)); @@ -48,29 +85,73 @@ AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr engine, const boost::optional& pId, std::shared_ptr databaseConfiguration, std::shared_ptr namespaceConfigurations, - std::shared_ptr logger): + std::shared_ptr logger, + const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator): engine(engine), databaseConfiguration(databaseConfiguration), namespaceConfigurations(namespaceConfigurations), publisherId(pId), - logger(logger) -{ -} - -AsyncStorage& AsyncStorageImpl::getRedisHandler() + logger(logger), + asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator) +{ +} + +void AsyncStorageImpl::setAsyncRedisStorageHandlersForCluster(const std::string& ns) +{ + static auto serverCount = databaseConfiguration->getServerAddresses().size(); + for (std::size_t addrIndex = 0; addrIndex < serverCount; addrIndex++) + { + auto redisHandler = std::make_shared(engine, + asyncDatabaseDiscoveryCreator( + engine, + ns, + std::ref(*databaseConfiguration), + addrIndex, + logger), + publisherId, + namespaceConfigurations, + logger); + asyncStorages.push_back(redisHandler); + } +} + +void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns) +{ + if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() || + DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType()) + { + setAsyncRedisStorageHandlersForCluster(ns); + return; + } + auto redisHandler = std::make_shared(engine, + asyncDatabaseDiscoveryCreator( + engine, + ns, + std::ref(*databaseConfiguration), + boost::none, + logger), + publisherId, + namespaceConfigurations, + logger); + asyncStorages.push_back(redisHandler); +} + +AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns) +{ + std::size_t handlerIndex{0}; + if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() || + DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType()) + handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size()); + return *asyncStorages.at(handlerIndex); +} + +AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns) { #if HAVE_REDIS - static AsyncRedisStorage redisHandler{engine, - redis::AsyncDatabaseDiscovery::create( - engine, - boost::none, - std::ref(*databaseConfiguration), - logger), - publisherId, - namespaceConfigurations, - logger}; - - return redisHandler; + if (asyncStorages.empty()) + setAsyncRedisStorageHandlers(ns); + + return getAsyncRedisStorageHandler(ns); #else logger->error() << "Redis operations cannot be performed, Redis not enabled"; SHAREDDATALAYER_ABORT("Invalid configuration."); @@ -86,7 +167,7 @@ AsyncStorage& AsyncStorageImpl::getDummyHandler() AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns) { if (namespaceConfigurations->isDbBackendUseEnabled(ns)) - return getRedisHandler(); + return getRedisHandler(ns); return getDummyHandler(); }