2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
23 #include "private/error.hpp"
24 #include "private/abort.hpp"
25 #include "private/asyncstorageimpl.hpp"
26 #include "private/configurationreader.hpp"
27 #include "private/asyncdummystorage.hpp"
28 #include "private/engine.hpp"
29 #include "private/logger.hpp"
31 #include "private/redis/asyncredisstorage.hpp"
34 #include <boost/optional/optional_io.hpp>
35 #include <boost/crc.hpp>
37 using namespace shareddatalayer;
38 using namespace shareddatalayer::redis;
42 std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
43 const std::string& ns,
44 const DatabaseConfiguration& databaseConfiguration,
45 const boost::optional<std::size_t>& addressIndex,
46 std::shared_ptr<Logger> logger)
48 return AsyncDatabaseDiscovery::create(engine,
50 databaseConfiguration,
55 std::uint32_t crc32(const std::string& s)
57 boost::crc_32_type result;
58 result.process_bytes(s.data(), s.size());
59 return result.checksum();
62 std::uint32_t getClusterHashIndex(const std::string& s, const size_t count)
64 return crc32(s)%count;
68 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
69 const boost::optional<PublisherId>& pId,
70 std::shared_ptr<Logger> logger):
72 databaseConfiguration(std::make_shared<DatabaseConfigurationImpl>()),
73 namespaceConfigurations(std::make_shared<NamespaceConfigurationsImpl>()),
76 asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator)
78 ConfigurationReader configurationReader(logger);
79 configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration));
80 configurationReader.readNamespaceConfigurations(std::ref(*namespaceConfigurations));
84 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
85 const boost::optional<PublisherId>& pId,
86 std::shared_ptr<DatabaseConfiguration> databaseConfiguration,
87 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
88 std::shared_ptr<Logger> logger,
89 const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator):
91 databaseConfiguration(databaseConfiguration),
92 namespaceConfigurations(namespaceConfigurations),
95 asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator)
99 void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns)
101 for (std::size_t i = 0; i < databaseConfiguration->getServerAddresses().size(); i++)
103 auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
104 asyncDatabaseDiscoveryCreator(
107 std::ref(*databaseConfiguration),
111 namespaceConfigurations,
113 asyncStorages.push_back(redisHandler);
117 AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns)
119 std::size_t handlerIndex{0};
120 if (DatabaseConfiguration::DbType::SDL_CLUSTER == databaseConfiguration->getDbType())
121 handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size());
122 return *asyncStorages.at(handlerIndex);
125 AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns)
128 auto serverAddresses(databaseConfiguration->getServerAddresses());
129 if (asyncStorages.empty())
130 setAsyncRedisStorageHandlers(ns);
132 return getAsyncRedisStorageHandler(ns);
134 logger->error() << "Redis operations cannot be performed, Redis not enabled";
135 SHAREDDATALAYER_ABORT("Invalid configuration.");
139 AsyncStorage& AsyncStorageImpl::getDummyHandler()
141 static AsyncDummyStorage dummyHandler{engine};
145 AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns)
147 if (namespaceConfigurations->isDbBackendUseEnabled(ns))
148 return getRedisHandler(ns);
150 return getDummyHandler();
153 int AsyncStorageImpl::fd() const
158 void AsyncStorageImpl::handleEvents()
160 engine->handleEvents();
163 void AsyncStorageImpl::waitReadyAsync(const Namespace& ns,
164 const ReadyAck& readyAck)
166 getOperationHandler(ns).waitReadyAsync(ns, readyAck);
169 void AsyncStorageImpl::setAsync(const Namespace& ns,
170 const DataMap& dataMap,
171 const ModifyAck& modifyAck)
173 getOperationHandler(ns).setAsync(ns, dataMap, modifyAck);
176 void AsyncStorageImpl::setIfAsync(const Namespace& ns,
180 const ModifyIfAck& modifyIfAck)
182 getOperationHandler(ns).setIfAsync(ns, key, oldData, newData, modifyIfAck);
185 void AsyncStorageImpl::removeIfAsync(const Namespace& ns,
188 const ModifyIfAck& modifyIfAck)
190 getOperationHandler(ns).removeIfAsync(ns, key, data, modifyIfAck);
193 void AsyncStorageImpl::setIfNotExistsAsync(const Namespace& ns,
196 const ModifyIfAck& modifyIfAck)
198 getOperationHandler(ns).setIfNotExistsAsync(ns, key, data, modifyIfAck);
201 void AsyncStorageImpl::getAsync(const Namespace& ns,
203 const GetAck& getAck)
205 getOperationHandler(ns).getAsync(ns, keys, getAck);
208 void AsyncStorageImpl::removeAsync(const Namespace& ns,
210 const ModifyAck& modifyAck)
212 getOperationHandler(ns).removeAsync(ns, keys, modifyAck);
215 void AsyncStorageImpl::findKeysAsync(const Namespace& ns,
216 const std::string& keyPrefix,
217 const FindKeysAck& findKeysAck)
219 getOperationHandler(ns).findKeysAsync(ns, keyPrefix, findKeysAck);
222 void AsyncStorageImpl::removeAllAsync(const Namespace& ns,
223 const ModifyAck& modifyAck)
225 getOperationHandler(ns).removeAllAsync(ns, modifyAck);