2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
23 #include "private/error.hpp"
24 #include "private/abort.hpp"
25 #include "private/asyncstorageimpl.hpp"
26 #include "private/configurationreader.hpp"
27 #include "private/asyncdummystorage.hpp"
28 #include "private/engine.hpp"
29 #include "private/logger.hpp"
31 #include "private/redis/asyncredisstorage.hpp"
34 #include <boost/optional/optional_io.hpp>
35 #include <boost/crc.hpp>
37 using namespace shareddatalayer;
38 using namespace shareddatalayer::redis;
42 std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
43 const std::string& ns,
44 const DatabaseConfiguration& databaseConfiguration,
45 const boost::optional<std::size_t>& addressIndex,
46 std::shared_ptr<Logger> logger)
48 return AsyncDatabaseDiscovery::create(engine,
50 databaseConfiguration,
55 std::uint32_t crc32(const std::string& s)
57 boost::crc_32_type result;
58 result.process_bytes(s.data(), s.size());
59 return result.checksum();
62 std::uint32_t getClusterHashIndex(const std::string& s, const size_t count)
64 return crc32(s)%count;
68 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
69 const boost::optional<PublisherId>& pId,
70 std::shared_ptr<Logger> logger):
72 databaseConfiguration(std::make_shared<DatabaseConfigurationImpl>()),
73 namespaceConfigurations(std::make_shared<NamespaceConfigurationsImpl>()),
76 asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator)
78 ConfigurationReader configurationReader(logger);
79 configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration));
80 configurationReader.readNamespaceConfigurations(std::ref(*namespaceConfigurations));
84 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
85 const boost::optional<PublisherId>& pId,
86 std::shared_ptr<DatabaseConfiguration> databaseConfiguration,
87 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
88 std::shared_ptr<Logger> logger,
89 const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator):
91 databaseConfiguration(databaseConfiguration),
92 namespaceConfigurations(namespaceConfigurations),
95 asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator)
99 void AsyncStorageImpl::setAsyncRedisStorageHandlersForCluster(const std::string& ns)
101 static auto serverCount = databaseConfiguration->getServerAddresses().size();
102 for (std::size_t addrIndex = 0; addrIndex < serverCount; addrIndex++)
104 auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
105 asyncDatabaseDiscoveryCreator(
108 std::ref(*databaseConfiguration),
112 namespaceConfigurations,
114 asyncStorages.push_back(redisHandler);
118 void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns)
120 if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
121 DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
123 setAsyncRedisStorageHandlersForCluster(ns);
126 auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
127 asyncDatabaseDiscoveryCreator(
130 std::ref(*databaseConfiguration),
134 namespaceConfigurations,
136 asyncStorages.push_back(redisHandler);
139 AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns)
141 std::size_t handlerIndex{0};
142 if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
143 DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
144 handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size());
145 return *asyncStorages.at(handlerIndex);
148 AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns)
151 if (asyncStorages.empty())
152 setAsyncRedisStorageHandlers(ns);
154 return getAsyncRedisStorageHandler(ns);
156 logger->error() << "Redis operations cannot be performed, Redis not enabled";
157 SHAREDDATALAYER_ABORT("Invalid configuration.");
161 AsyncStorage& AsyncStorageImpl::getDummyHandler()
163 static AsyncDummyStorage dummyHandler{engine};
167 AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns)
169 if (namespaceConfigurations->isDbBackendUseEnabled(ns))
170 return getRedisHandler(ns);
172 return getDummyHandler();
175 int AsyncStorageImpl::fd() const
180 void AsyncStorageImpl::handleEvents()
182 engine->handleEvents();
185 void AsyncStorageImpl::waitReadyAsync(const Namespace& ns,
186 const ReadyAck& readyAck)
188 getOperationHandler(ns).waitReadyAsync(ns, readyAck);
191 void AsyncStorageImpl::setAsync(const Namespace& ns,
192 const DataMap& dataMap,
193 const ModifyAck& modifyAck)
195 getOperationHandler(ns).setAsync(ns, dataMap, modifyAck);
198 void AsyncStorageImpl::setIfAsync(const Namespace& ns,
202 const ModifyIfAck& modifyIfAck)
204 getOperationHandler(ns).setIfAsync(ns, key, oldData, newData, modifyIfAck);
207 void AsyncStorageImpl::removeIfAsync(const Namespace& ns,
210 const ModifyIfAck& modifyIfAck)
212 getOperationHandler(ns).removeIfAsync(ns, key, data, modifyIfAck);
215 void AsyncStorageImpl::setIfNotExistsAsync(const Namespace& ns,
218 const ModifyIfAck& modifyIfAck)
220 getOperationHandler(ns).setIfNotExistsAsync(ns, key, data, modifyIfAck);
223 void AsyncStorageImpl::getAsync(const Namespace& ns,
225 const GetAck& getAck)
227 getOperationHandler(ns).getAsync(ns, keys, getAck);
230 void AsyncStorageImpl::removeAsync(const Namespace& ns,
232 const ModifyAck& modifyAck)
234 getOperationHandler(ns).removeAsync(ns, keys, modifyAck);
237 void AsyncStorageImpl::findKeysAsync(const Namespace& ns,
238 const std::string& keyPrefix,
239 const FindKeysAck& findKeysAck)
241 getOperationHandler(ns).findKeysAsync(ns, keyPrefix, findKeysAck);
244 void AsyncStorageImpl::listKeys(const Namespace& ns,
245 const std::string& pattern,
246 const FindKeysAck& findKeysAck)
248 getOperationHandler(ns).listKeys(ns, pattern, findKeysAck);
251 void AsyncStorageImpl::removeAllAsync(const Namespace& ns,
252 const ModifyAck& modifyAck)
254 getOperationHandler(ns).removeAllAsync(ns, modifyAck);