X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fasyncstorageimpl.cpp;h=0d6d683ab5019acaf1fb105ce94fef31504e4738;hb=refs%2Fchanges%2F12%2F6712%2F5;hp=31ac713d998ee4785b7051e5ced4aa447bd91050;hpb=a0745d294dcd72f7d78ea4c3accd3b477dd668a5;p=ric-plt%2Fsdl.git diff --git a/src/asyncstorageimpl.cpp b/src/asyncstorageimpl.cpp index 31ac713..0d6d683 100644 --- a/src/asyncstorageimpl.cpp +++ b/src/asyncstorageimpl.cpp @@ -31,20 +31,38 @@ #include "private/redis/asyncredisstorage.hpp" #endif +#include +#include + using namespace shareddatalayer; using namespace shareddatalayer::redis; namespace { std::shared_ptr asyncDatabaseDiscoveryCreator(std::shared_ptr engine, + const std::string& ns, const DatabaseConfiguration& databaseConfiguration, + const boost::optional& addressIndex, std::shared_ptr logger) { return AsyncDatabaseDiscovery::create(engine, - boost::none, + ns, databaseConfiguration, + addressIndex, logger); } + + std::uint32_t crc32(const std::string& s) + { + boost::crc_32_type result; + result.process_bytes(s.data(), s.size()); + return result.checksum(); + } + + std::uint32_t getClusterHashIndex(const std::string& s, const size_t count) + { + return crc32(s)%count; + } } AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr engine, @@ -78,19 +96,62 @@ AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr engine, { } -AsyncStorage& AsyncStorageImpl::getRedisHandler() +void AsyncStorageImpl::setAsyncRedisStorageHandlersForCluster(const std::string& ns) +{ + static auto serverCount = databaseConfiguration->getServerAddresses().size(); + for (std::size_t addrIndex = 0; addrIndex < serverCount; addrIndex++) + { + auto redisHandler = std::make_shared(engine, + asyncDatabaseDiscoveryCreator( + engine, + ns, + std::ref(*databaseConfiguration), + addrIndex, + logger), + publisherId, + namespaceConfigurations, + logger); + asyncStorages.push_back(redisHandler); + } +} + +void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns) +{ + if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() || + DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType()) + { + setAsyncRedisStorageHandlersForCluster(ns); + return; + } + auto redisHandler = std::make_shared(engine, + asyncDatabaseDiscoveryCreator( + engine, + ns, + std::ref(*databaseConfiguration), + boost::none, + logger), + publisherId, + namespaceConfigurations, + logger); + asyncStorages.push_back(redisHandler); +} + +AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns) +{ + std::size_t handlerIndex{0}; + if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() || + DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType()) + handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size()); + return *asyncStorages.at(handlerIndex); +} + +AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns) { #if HAVE_REDIS - static AsyncRedisStorage redisHandler{engine, - asyncDatabaseDiscoveryCreator( - engine, - std::ref(*databaseConfiguration), - logger), - publisherId, - namespaceConfigurations, - logger}; - - return redisHandler; + if (asyncStorages.empty()) + setAsyncRedisStorageHandlers(ns); + + return getAsyncRedisStorageHandler(ns); #else logger->error() << "Redis operations cannot be performed, Redis not enabled"; SHAREDDATALAYER_ABORT("Invalid configuration."); @@ -106,7 +167,7 @@ AsyncStorage& AsyncStorageImpl::getDummyHandler() AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns) { if (namespaceConfigurations->isDbBackendUseEnabled(ns)) - return getRedisHandler(); + return getRedisHandler(ns); return getDummyHandler(); } @@ -180,6 +241,13 @@ void AsyncStorageImpl::findKeysAsync(const Namespace& ns, getOperationHandler(ns).findKeysAsync(ns, keyPrefix, findKeysAck); } +void AsyncStorageImpl::listKeys(const Namespace& ns, + const std::string& pattern, + const FindKeysAck& findKeysAck) +{ + getOperationHandler(ns).listKeys(ns, pattern, findKeysAck); +} + void AsyncStorageImpl::removeAllAsync(const Namespace& ns, const ModifyAck& modifyAck) {