limitations under the License.
*/
+/*
+ * This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ * platform project (RICP).
+*/
+
#include "config.h"
#include "private/error.hpp"
#include "private/abort.hpp"
#include "private/engine.hpp"
#include "private/logger.hpp"
#if HAVE_REDIS
-#include "private/redis/asyncdatabasediscovery.hpp"
#include "private/redis/asyncredisstorage.hpp"
#endif
+#include <boost/optional/optional_io.hpp>
+#include <boost/crc.hpp>
+
using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+ std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
+ const std::string& ns,
+ const DatabaseConfiguration& databaseConfiguration,
+ const boost::optional<std::size_t>& addressIndex,
+ std::shared_ptr<Logger> logger)
+ {
+ return AsyncDatabaseDiscovery::create(engine,
+ ns,
+ databaseConfiguration,
+ addressIndex,
+ logger);
+ }
+
+ std::uint32_t crc32(const std::string& s)
+ {
+ boost::crc_32_type result;
+ result.process_bytes(s.data(), s.size());
+ return result.checksum();
+ }
+
+ std::uint32_t getClusterHashIndex(const std::string& s, const size_t count)
+ {
+ return crc32(s)%count;
+ }
+}
AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
const boost::optional<PublisherId>& pId,
databaseConfiguration(std::make_shared<DatabaseConfigurationImpl>()),
namespaceConfigurations(std::make_shared<NamespaceConfigurationsImpl>()),
publisherId(pId),
- logger(logger)
+ logger(logger),
+ asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator)
{
ConfigurationReader configurationReader(logger);
configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration));
const boost::optional<PublisherId>& pId,
std::shared_ptr<DatabaseConfiguration> databaseConfiguration,
std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
- std::shared_ptr<Logger> logger):
+ std::shared_ptr<Logger> logger,
+ const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator):
engine(engine),
databaseConfiguration(databaseConfiguration),
namespaceConfigurations(namespaceConfigurations),
publisherId(pId),
- logger(logger)
-{
-}
-
-AsyncStorage& AsyncStorageImpl::getRedisHandler()
+ logger(logger),
+ asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator)
+{
+}
+
+void AsyncStorageImpl::setAsyncRedisStorageHandlersForCluster(const std::string& ns)
+{
+ static auto serverCount = databaseConfiguration->getServerAddresses().size();
+ for (std::size_t addrIndex = 0; addrIndex < serverCount; addrIndex++)
+ {
+ auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
+ asyncDatabaseDiscoveryCreator(
+ engine,
+ ns,
+ std::ref(*databaseConfiguration),
+ addrIndex,
+ logger),
+ publisherId,
+ namespaceConfigurations,
+ logger);
+ asyncStorages.push_back(redisHandler);
+ }
+}
+
+void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns)
+{
+ if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
+ DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
+ {
+ setAsyncRedisStorageHandlersForCluster(ns);
+ return;
+ }
+ auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
+ asyncDatabaseDiscoveryCreator(
+ engine,
+ ns,
+ std::ref(*databaseConfiguration),
+ boost::none,
+ logger),
+ publisherId,
+ namespaceConfigurations,
+ logger);
+ asyncStorages.push_back(redisHandler);
+}
+
+AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns)
+{
+ std::size_t handlerIndex{0};
+ if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
+ DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
+ handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size());
+ return *asyncStorages.at(handlerIndex);
+}
+
+AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns)
{
#if HAVE_REDIS
- static AsyncRedisStorage redisHandler{engine,
- redis::AsyncDatabaseDiscovery::create(
- engine,
- boost::none,
- std::ref(*databaseConfiguration),
- logger),
- publisherId,
- namespaceConfigurations,
- logger};
-
- return redisHandler;
+ if (asyncStorages.empty())
+ setAsyncRedisStorageHandlers(ns);
+
+ return getAsyncRedisStorageHandler(ns);
#else
logger->error() << "Redis operations cannot be performed, Redis not enabled";
SHAREDDATALAYER_ABORT("Invalid configuration.");
AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns)
{
if (namespaceConfigurations->isDbBackendUseEnabled(ns))
- return getRedisHandler();
+ return getRedisHandler(ns);
return getDummyHandler();
}