RIC:1060: Change in PTL
[ric-plt/sdl.git] / src / asyncstorageimpl.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include "config.h"
23 #include "private/error.hpp"
24 #include "private/abort.hpp"
25 #include "private/asyncstorageimpl.hpp"
26 #include "private/configurationreader.hpp"
27 #include "private/asyncdummystorage.hpp"
28 #include "private/engine.hpp"
29 #include "private/logger.hpp"
30 #if HAVE_REDIS
31 #include "private/redis/asyncredisstorage.hpp"
32 #endif
33
34 #include <boost/optional/optional_io.hpp>
35 #include <boost/crc.hpp>
36
37 using namespace shareddatalayer;
38 using namespace shareddatalayer::redis;
39
40 namespace
41 {
42         std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
43                                                                               const std::string& ns,
44                                                                               const DatabaseConfiguration& databaseConfiguration,
45                                                                               const boost::optional<std::size_t>& addressIndex,
46                                                                               std::shared_ptr<Logger> logger)
47         {
48             return AsyncDatabaseDiscovery::create(engine,
49                                                   ns,
50                                                   databaseConfiguration,
51                                                   addressIndex,
52                                                   logger);
53         }
54
55         std::uint32_t crc32(const std::string& s)
56         {
57            boost::crc_32_type result;
58            result.process_bytes(s.data(), s.size());
59            return result.checksum();
60         }
61
62         std::uint32_t getClusterHashIndex(const std::string& s, const size_t count)
63         {
64             return crc32(s)%count;
65         }
66 }
67
68 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
69                                    const boost::optional<PublisherId>& pId,
70                                    std::shared_ptr<Logger> logger):
71     engine(engine),
72     databaseConfiguration(std::make_shared<DatabaseConfigurationImpl>()),
73     namespaceConfigurations(std::make_shared<NamespaceConfigurationsImpl>()),
74     publisherId(pId),
75     logger(logger),
76     asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator)
77 {
78     ConfigurationReader configurationReader(logger);
79     configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration));
80     configurationReader.readNamespaceConfigurations(std::ref(*namespaceConfigurations));
81 }
82
83 // Meant for UT usage
84 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
85                                    const boost::optional<PublisherId>& pId,
86                                    std::shared_ptr<DatabaseConfiguration> databaseConfiguration,
87                                    std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
88                                    std::shared_ptr<Logger> logger,
89                                    const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator):
90     engine(engine),
91     databaseConfiguration(databaseConfiguration),
92     namespaceConfigurations(namespaceConfigurations),
93     publisherId(pId),
94     logger(logger),
95     asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator)
96 {
97 }
98
99 void AsyncStorageImpl::setAsyncRedisStorageHandlersForCluster(const std::string& ns)
100 {
101     static auto serverCount = databaseConfiguration->getServerAddresses().size();
102     for (std::size_t addrIndex = 0; addrIndex < serverCount; addrIndex++)
103     {
104         auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
105                                                                 asyncDatabaseDiscoveryCreator(
106                                                                         engine,
107                                                                         ns,
108                                                                         std::ref(*databaseConfiguration),
109                                                                         addrIndex,
110                                                                         logger),
111                                                                 publisherId,
112                                                                 namespaceConfigurations,
113                                                                 logger);
114         asyncStorages.push_back(redisHandler);
115     }
116 }
117
118 void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns)
119 {
120     if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
121         DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
122     {
123             setAsyncRedisStorageHandlersForCluster(ns);
124             return;
125     }
126     auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
127                                                             asyncDatabaseDiscoveryCreator(
128                                                                     engine,
129                                                                     ns,
130                                                                     std::ref(*databaseConfiguration),
131                                                                     boost::none,
132                                                                     logger),
133                                                             publisherId,
134                                                             namespaceConfigurations,
135                                                             logger);
136     asyncStorages.push_back(redisHandler);
137 }
138
139 AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns)
140 {
141     std::size_t handlerIndex{0};
142     if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
143         DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
144         handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size());
145     return *asyncStorages.at(handlerIndex);
146 }
147
148 AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns)
149 {
150 #if HAVE_REDIS
151     if (asyncStorages.empty())
152             setAsyncRedisStorageHandlers(ns);
153
154     return getAsyncRedisStorageHandler(ns);
155 #else
156     logger->error() << "Redis operations cannot be performed, Redis not enabled";
157     SHAREDDATALAYER_ABORT("Invalid configuration.");
158 #endif
159 }
160
161 AsyncStorage& AsyncStorageImpl::getDummyHandler()
162 {
163     static AsyncDummyStorage dummyHandler{engine};
164     return dummyHandler;
165 }
166
167 AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns)
168 {
169     if (namespaceConfigurations->isDbBackendUseEnabled(ns))
170         return getRedisHandler(ns);
171
172     return getDummyHandler();
173 }
174
175 int AsyncStorageImpl::fd() const
176 {
177     return engine->fd();
178 }
179
180 void AsyncStorageImpl::handleEvents()
181 {
182     engine->handleEvents();
183 }
184
185 void AsyncStorageImpl::waitReadyAsync(const Namespace& ns,
186                                       const ReadyAck& readyAck)
187 {
188     getOperationHandler(ns).waitReadyAsync(ns, readyAck);
189 }
190
191 void AsyncStorageImpl::setAsync(const Namespace& ns,
192                                 const DataMap& dataMap,
193                                 const ModifyAck& modifyAck)
194 {
195     getOperationHandler(ns).setAsync(ns, dataMap, modifyAck);
196 }
197
198 void AsyncStorageImpl::setIfAsync(const Namespace& ns,
199                                   const Key& key,
200                                   const Data& oldData,
201                                   const Data& newData,
202                                   const ModifyIfAck& modifyIfAck)
203 {
204     getOperationHandler(ns).setIfAsync(ns, key, oldData, newData, modifyIfAck);
205 }
206
207 void AsyncStorageImpl::removeIfAsync(const Namespace& ns,
208                                      const Key& key,
209                                      const Data& data,
210                                      const ModifyIfAck& modifyIfAck)
211 {
212     getOperationHandler(ns).removeIfAsync(ns, key, data, modifyIfAck);
213 }
214
215 void AsyncStorageImpl::setIfNotExistsAsync(const Namespace& ns,
216                                            const Key& key,
217                                            const Data& data,
218                                            const ModifyIfAck& modifyIfAck)
219 {
220     getOperationHandler(ns).setIfNotExistsAsync(ns, key, data, modifyIfAck);
221 }
222
223 void AsyncStorageImpl::getAsync(const Namespace& ns,
224                                 const Keys& keys,
225                                 const GetAck& getAck)
226 {
227     getOperationHandler(ns).getAsync(ns, keys, getAck);
228 }
229
230 void AsyncStorageImpl::removeAsync(const Namespace& ns,
231                                    const Keys& keys,
232                                    const ModifyAck& modifyAck)
233 {
234     getOperationHandler(ns).removeAsync(ns, keys, modifyAck);
235 }
236
237 void AsyncStorageImpl::findKeysAsync(const Namespace& ns,
238                                      const std::string& keyPrefix,
239                                      const FindKeysAck& findKeysAck)
240 {
241     getOperationHandler(ns).findKeysAsync(ns, keyPrefix, findKeysAck);
242 }
243
244 void AsyncStorageImpl::listKeys(const Namespace& ns,
245                                 const std::string& pattern,
246                                 const FindKeysAck& findKeysAck)
247 {
248     getOperationHandler(ns).listKeys(ns, pattern, findKeysAck);
249 }
250
251 void AsyncStorageImpl::removeAllAsync(const Namespace& ns,
252                                        const ModifyAck& modifyAck)
253 {
254     getOperationHandler(ns).removeAllAsync(ns, modifyAck);
255 }