Multiple DBAAS Redis Sentinel groups
[ric-plt/sdl.git] / src / asyncstorageimpl.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include "config.h"
23 #include "private/error.hpp"
24 #include "private/abort.hpp"
25 #include "private/asyncstorageimpl.hpp"
26 #include "private/configurationreader.hpp"
27 #include "private/asyncdummystorage.hpp"
28 #include "private/engine.hpp"
29 #include "private/logger.hpp"
30 #if HAVE_REDIS
31 #include "private/redis/asyncredisstorage.hpp"
32 #endif
33
34 #include <boost/optional/optional_io.hpp>
35 #include <boost/crc.hpp>
36
37 using namespace shareddatalayer;
38 using namespace shareddatalayer::redis;
39
40 namespace
41 {
42         std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
43                                                                               const std::string& ns,
44                                                                               const DatabaseConfiguration& databaseConfiguration,
45                                                                               const boost::optional<std::size_t>& addressIndex,
46                                                                               std::shared_ptr<Logger> logger)
47         {
48             return AsyncDatabaseDiscovery::create(engine,
49                                                   ns,
50                                                   databaseConfiguration,
51                                                   addressIndex,
52                                                   logger);
53         }
54
55         std::uint32_t crc32(const std::string& s)
56         {
57            boost::crc_32_type result;
58            result.process_bytes(s.data(), s.size());
59            return result.checksum();
60         }
61
62         std::uint32_t getClusterHashIndex(const std::string& s, const size_t count)
63         {
64             return crc32(s)%count;
65         }
66 }
67
68 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
69                                    const boost::optional<PublisherId>& pId,
70                                    std::shared_ptr<Logger> logger):
71     engine(engine),
72     databaseConfiguration(std::make_shared<DatabaseConfigurationImpl>()),
73     namespaceConfigurations(std::make_shared<NamespaceConfigurationsImpl>()),
74     publisherId(pId),
75     logger(logger),
76     asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator)
77 {
78     ConfigurationReader configurationReader(logger);
79     configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration));
80     configurationReader.readNamespaceConfigurations(std::ref(*namespaceConfigurations));
81 }
82
83 // Meant for UT usage
84 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
85                                    const boost::optional<PublisherId>& pId,
86                                    std::shared_ptr<DatabaseConfiguration> databaseConfiguration,
87                                    std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
88                                    std::shared_ptr<Logger> logger,
89                                    const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator):
90     engine(engine),
91     databaseConfiguration(databaseConfiguration),
92     namespaceConfigurations(namespaceConfigurations),
93     publisherId(pId),
94     logger(logger),
95     asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator)
96 {
97 }
98
99 void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns)
100 {
101     for (std::size_t i = 0; i < databaseConfiguration->getServerAddresses().size(); i++)
102     {
103         auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
104                                                                 asyncDatabaseDiscoveryCreator(
105                                                                         engine,
106                                                                         ns,
107                                                                         std::ref(*databaseConfiguration),
108                                                                         i,
109                                                                         logger),
110                                                                 publisherId,
111                                                                 namespaceConfigurations,
112                                                                 logger);
113         asyncStorages.push_back(redisHandler);
114     }
115 }
116
117 AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns)
118 {
119     std::size_t handlerIndex{0};
120     if (DatabaseConfiguration::DbType::SDL_CLUSTER == databaseConfiguration->getDbType())
121         handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size());
122     return *asyncStorages.at(handlerIndex);
123 }
124
125 AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns)
126 {
127 #if HAVE_REDIS
128     auto serverAddresses(databaseConfiguration->getServerAddresses());
129     if (asyncStorages.empty())
130             setAsyncRedisStorageHandlers(ns);
131
132     return getAsyncRedisStorageHandler(ns);
133 #else
134     logger->error() << "Redis operations cannot be performed, Redis not enabled";
135     SHAREDDATALAYER_ABORT("Invalid configuration.");
136 #endif
137 }
138
139 AsyncStorage& AsyncStorageImpl::getDummyHandler()
140 {
141     static AsyncDummyStorage dummyHandler{engine};
142     return dummyHandler;
143 }
144
145 AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns)
146 {
147     if (namespaceConfigurations->isDbBackendUseEnabled(ns))
148         return getRedisHandler(ns);
149
150     return getDummyHandler();
151 }
152
153 int AsyncStorageImpl::fd() const
154 {
155     return engine->fd();
156 }
157
158 void AsyncStorageImpl::handleEvents()
159 {
160     engine->handleEvents();
161 }
162
163 void AsyncStorageImpl::waitReadyAsync(const Namespace& ns,
164                                       const ReadyAck& readyAck)
165 {
166     getOperationHandler(ns).waitReadyAsync(ns, readyAck);
167 }
168
169 void AsyncStorageImpl::setAsync(const Namespace& ns,
170                                 const DataMap& dataMap,
171                                 const ModifyAck& modifyAck)
172 {
173     getOperationHandler(ns).setAsync(ns, dataMap, modifyAck);
174 }
175
176 void AsyncStorageImpl::setIfAsync(const Namespace& ns,
177                                   const Key& key,
178                                   const Data& oldData,
179                                   const Data& newData,
180                                   const ModifyIfAck& modifyIfAck)
181 {
182     getOperationHandler(ns).setIfAsync(ns, key, oldData, newData, modifyIfAck);
183 }
184
185 void AsyncStorageImpl::removeIfAsync(const Namespace& ns,
186                                      const Key& key,
187                                      const Data& data,
188                                      const ModifyIfAck& modifyIfAck)
189 {
190     getOperationHandler(ns).removeIfAsync(ns, key, data, modifyIfAck);
191 }
192
193 void AsyncStorageImpl::setIfNotExistsAsync(const Namespace& ns,
194                                            const Key& key,
195                                            const Data& data,
196                                            const ModifyIfAck& modifyIfAck)
197 {
198     getOperationHandler(ns).setIfNotExistsAsync(ns, key, data, modifyIfAck);
199 }
200
201 void AsyncStorageImpl::getAsync(const Namespace& ns,
202                                 const Keys& keys,
203                                 const GetAck& getAck)
204 {
205     getOperationHandler(ns).getAsync(ns, keys, getAck);
206 }
207
208 void AsyncStorageImpl::removeAsync(const Namespace& ns,
209                                    const Keys& keys,
210                                    const ModifyAck& modifyAck)
211 {
212     getOperationHandler(ns).removeAsync(ns, keys, modifyAck);
213 }
214
215 void AsyncStorageImpl::findKeysAsync(const Namespace& ns,
216                                      const std::string& keyPrefix,
217                                      const FindKeysAck& findKeysAck)
218 {
219     getOperationHandler(ns).findKeysAsync(ns, keyPrefix, findKeysAck);
220 }
221
222 void AsyncStorageImpl::removeAllAsync(const Namespace& ns,
223                                        const ModifyAck& modifyAck)
224 {
225     getOperationHandler(ns).removeAllAsync(ns, modifyAck);
226 }