2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
24 #include "private/error.hpp"
25 #include <sdl/emptynamespace.hpp>
26 #include <sdl/invalidnamespace.hpp>
27 #include <sdl/publisherid.hpp>
28 #include "private/abort.hpp"
29 #include "private/createlogger.hpp"
30 #include "private/engine.hpp"
31 #include "private/logger.hpp"
32 #include "private/namespacevalidator.hpp"
33 #include "private/configurationreader.hpp"
34 #include "private/redis/asynccommanddispatcher.hpp"
35 #include "private/redis/asyncdatabasediscovery.hpp"
36 #include "private/redis/asyncredisstorage.hpp"
37 #include "private/redis/contents.hpp"
38 #include "private/redis/contentsbuilder.hpp"
39 #include "private/redis/redisgeneral.hpp"
40 #include "private/redis/reply.hpp"
42 using namespace shareddatalayer;
43 using namespace shareddatalayer::redis;
45 /* TODO: This implementation contains lot of duplicated code with old API (asyncRedisConnection).
46 * When this new API is fully ready and tested old API implementation could be changed to utilize this
47 * (bit like sync API utilizes async API).
52 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
53 const DatabaseInfo& databaseInfo,
54 std::shared_ptr<ContentsBuilder> contentsBuilder,
55 std::shared_ptr<Logger> logger)
57 return AsyncCommandDispatcher::create(engine,
65 class AsyncRedisStorageErrorCategory: public std::error_category
68 AsyncRedisStorageErrorCategory() = default;
70 const char* name() const noexcept override;
72 std::string message(int condition) const override;
74 std::error_condition default_error_condition(int condition) const noexcept override;
77 const char* AsyncRedisStorageErrorCategory::name() const noexcept
79 return "asyncredisstorage";
82 std::string AsyncRedisStorageErrorCategory::message(int condition) const
84 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
86 case AsyncRedisStorage::ErrorCode::SUCCESS:
87 return std::error_code().message();
88 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
89 return "connection to the underlying data storage not yet available";
90 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
91 return "invalid namespace identifier passed to SDL API";
92 case AsyncRedisStorage::ErrorCode::END_MARKER:
93 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)");
94 return "unsupported error code for message()";
96 return "description missing for AsyncRedisStorageErrorCategory error: " + std::to_string(condition);
100 std::error_condition AsyncRedisStorageErrorCategory::default_error_condition(int condition) const noexcept
102 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
104 case AsyncRedisStorage::ErrorCode::SUCCESS:
105 return InternalError::SUCCESS;
106 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
107 return InternalError::SDL_NOT_READY;
108 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
109 return InternalError::SDL_RECEIVED_INVALID_PARAMETER;
110 case AsyncRedisStorage::ErrorCode::END_MARKER:
111 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)");
112 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
114 std::ostringstream msg;
115 msg << "default error condition missing for AsyncRedisStorageErrorCategory error: "
117 logErrorOnce(msg.str());
118 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
122 AsyncStorage::DataMap buildDataMap(const AsyncStorage::Keys& keys, const Reply::ReplyVector& replyVector)
124 AsyncStorage::DataMap dataMap;
126 for (const auto& j : keys)
128 if (replyVector[i]->getType() == Reply::Type::STRING)
130 AsyncStorage::Data data;
131 auto dataStr(replyVector[i]->getString());
132 for (ReplyStringLength k(0); k < dataStr->len; ++k)
133 data.push_back(static_cast<uint8_t>(dataStr->str[static_cast<size_t>(k)]));
134 dataMap.insert({ j, data });
141 AsyncStorage::Key getKey(const Reply::DataItem& item)
143 std::string str(item.str.c_str(), static_cast<size_t>(item.len));
144 auto res(str.find(AsyncRedisStorage::SEPARATOR));
145 return str.substr(res + 1);
148 AsyncStorage::Keys getKeys(const Reply::ReplyVector& replyVector)
150 AsyncStorage::Keys keys;
151 for (const auto& i : replyVector)
153 if (i->getType() == Reply::Type::STRING)
154 keys.insert(getKey(*i->getString()));
159 void escapeRedisSearchPatternCharacters(std::string& stringToProcess)
161 const std::string redisSearchPatternCharacters = R"(*?[]\)";
163 std::size_t foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters);
165 while (foundPosition != std::string::npos)
167 stringToProcess.insert(foundPosition, R"(\)");
168 foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters, foundPosition + 2);
173 AsyncRedisStorage::ErrorCode& shareddatalayer::operator++ (AsyncRedisStorage::ErrorCode& ecEnum)
175 if (ecEnum == AsyncRedisStorage::ErrorCode::END_MARKER)
176 throw std::out_of_range("for AsyncRedisStorage::ErrorCode& operator ++");
177 ecEnum = AsyncRedisStorage::ErrorCode(static_cast<std::underlying_type<AsyncRedisStorage::ErrorCode>::type>(ecEnum) + 1);
181 std::error_code shareddatalayer::make_error_code(AsyncRedisStorage::ErrorCode errorCode)
183 return std::error_code(static_cast<int>(errorCode), AsyncRedisStorage::errorCategory());
186 const std::error_category& AsyncRedisStorage::errorCategory() noexcept
188 static const AsyncRedisStorageErrorCategory theAsyncRedisStorageErrorCategory;
189 return theAsyncRedisStorageErrorCategory;
192 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
193 std::shared_ptr<AsyncDatabaseDiscovery> discovery,
194 const boost::optional<PublisherId>& pId,
195 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
196 std::shared_ptr<Logger> logger):
197 AsyncRedisStorage(engine,
200 namespaceConfigurations,
201 ::asyncCommandDispatcherCreator,
202 std::make_shared<redis::ContentsBuilder>(SEPARATOR),
207 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
208 std::shared_ptr<redis::AsyncDatabaseDiscovery> discovery,
209 const boost::optional<PublisherId>& pId,
210 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
211 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
212 std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
213 std::shared_ptr<Logger> logger):
216 discovery(discovery),
218 asyncCommandDispatcherCreator(asyncCommandDispatcherCreator),
219 contentsBuilder(contentsBuilder),
220 namespaceConfigurations(namespaceConfigurations),
223 if(publisherId && (*publisherId).empty())
225 throw std::invalid_argument("AsyncRedisStorage: empty publisher ID string given");
228 discovery->setStateChangedCb([this](const redis::DatabaseInfo& databaseInfo)
230 serviceStateChanged(databaseInfo);
234 AsyncRedisStorage::~AsyncRedisStorage()
237 discovery->clearStateChangedCb();
239 dispatcher->disableCommandCallbacks();
242 redis::DatabaseInfo& AsyncRedisStorage::getDatabaseInfo()
247 void AsyncRedisStorage::serviceStateChanged(const redis::DatabaseInfo& newDatabaseInfo)
249 dispatcher = asyncCommandDispatcherCreator(*engine,
254 dispatcher->waitConnectedAsync([this]()
256 readyAck(std::error_code());
257 readyAck = ReadyAck();
259 dbInfo = newDatabaseInfo;
262 int AsyncRedisStorage::fd() const
267 void AsyncRedisStorage::handleEvents()
269 engine->handleEvents();
272 bool AsyncRedisStorage::canOperationBePerformed(const Namespace& ns,
273 boost::optional<bool> noKeysGiven,
274 std::error_code& ecToReturn)
276 if (!::isValidNamespace(ns))
278 logErrorOnce("Invalid namespace identifier: " + ns + " passed to SDL");
279 ecToReturn = std::error_code(ErrorCode::INVALID_NAMESPACE);
282 if (noKeysGiven && *noKeysGiven)
284 ecToReturn = std::error_code();
289 ecToReturn = std::error_code(ErrorCode::REDIS_NOT_YET_DISCOVERED);
293 ecToReturn = std::error_code();
297 void AsyncRedisStorage::waitReadyAsync(const Namespace&,
298 const ReadyAck& readyAck)
301 dispatcher->waitConnectedAsync([readyAck]()
303 readyAck(std::error_code());
306 this->readyAck = readyAck;
309 void AsyncRedisStorage::setAsync(const Namespace& ns,
310 const DataMap& dataMap,
311 const ModifyAck& modifyAck)
315 if (!canOperationBePerformed(ns, dataMap.empty(), ec))
317 engine->postCallback(std::bind(modifyAck, ec));
321 if (namespaceConfigurations->areNotificationsEnabled(ns))
322 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
324 std::placeholders::_1,
325 std::placeholders::_2,
328 contentsBuilder->build("MSETPUB", ns, dataMap, ns, getPublishMessage()));
330 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
332 std::placeholders::_1,
333 std::placeholders::_2,
336 contentsBuilder->build("MSET", ns, dataMap));
339 void AsyncRedisStorage::modificationCommandCallback(const std::error_code& error,
341 const ModifyAck& modifyAck )
346 void AsyncRedisStorage::conditionalCommandCallback(const std::error_code& error,
348 const ModifyIfAck& modifyIfAck)
350 auto type(reply.getType());
352 (type == Reply::Type::NIL) || // SETIE(PUB)
353 ((type == Reply::Type::INTEGER) && (reply.getInteger() != 1))) // SETNX(PUB) and DELIE(PUB)
354 modifyIfAck(error, false);
356 modifyIfAck(error, true);
359 void AsyncRedisStorage::setIfAsync(const Namespace& ns,
363 const ModifyIfAck& modifyIfAck)
367 if (!canOperationBePerformed(ns, boost::none, ec))
369 engine->postCallback(std::bind(modifyIfAck, ec, false));
373 if (namespaceConfigurations->areNotificationsEnabled(ns))
374 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
376 std::placeholders::_1,
377 std::placeholders::_2,
380 contentsBuilder->build("SETIEPUB", ns, key, newData, oldData, ns, getPublishMessage()));
382 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
384 std::placeholders::_1,
385 std::placeholders::_2,
388 contentsBuilder->build("SETIE", ns, key, newData, oldData));
391 void AsyncRedisStorage::removeIfAsync(const Namespace& ns,
394 const ModifyIfAck& modifyIfAck)
398 if (!canOperationBePerformed(ns, boost::none, ec))
400 engine->postCallback(std::bind(modifyIfAck, ec, false));
404 if (namespaceConfigurations->areNotificationsEnabled(ns))
405 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
407 std::placeholders::_1,
408 std::placeholders::_2,
411 contentsBuilder->build("DELIEPUB", ns, key, data, ns, getPublishMessage()));
413 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
415 std::placeholders::_1,
416 std::placeholders::_2,
419 contentsBuilder->build("DELIE", ns, key, data));
422 std::string AsyncRedisStorage::getPublishMessage() const
430 void AsyncRedisStorage::setIfNotExistsAsync(const Namespace& ns,
433 const ModifyIfAck& modifyIfAck)
437 if (!canOperationBePerformed(ns, boost::none, ec))
439 engine->postCallback(std::bind(modifyIfAck, ec, false));
443 if (namespaceConfigurations->areNotificationsEnabled(ns))
444 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
446 std::placeholders::_1,
447 std::placeholders::_2,
450 contentsBuilder->build("SETNXPUB", ns, key, data, ns ,getPublishMessage()));
452 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
454 std::placeholders::_1,
455 std::placeholders::_2,
458 contentsBuilder->build("SETNX", ns, key, data));
461 void AsyncRedisStorage::getAsync(const Namespace& ns,
463 const GetAck& getAck)
467 if (!canOperationBePerformed(ns, keys.empty(), ec))
469 engine->postCallback(std::bind(getAck, ec, DataMap()));
473 dispatcher->dispatchAsync([getAck, keys](const std::error_code& error,
477 getAck(error, DataMap());
479 getAck(std::error_code(), buildDataMap(keys, *reply.getArray()));
482 contentsBuilder->build("MGET", ns, keys));
485 void AsyncRedisStorage::removeAsync(const Namespace& ns,
487 const ModifyAck& modifyAck)
491 if (!canOperationBePerformed(ns, keys.empty(), ec))
493 engine->postCallback(std::bind(modifyAck, ec));
497 if (namespaceConfigurations->areNotificationsEnabled(ns))
498 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
500 std::placeholders::_1,
501 std::placeholders::_2,
504 contentsBuilder->build("DELPUB", ns, keys, ns, getPublishMessage()));
506 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
508 std::placeholders::_1,
509 std::placeholders::_2,
512 contentsBuilder->build("DEL", ns, keys));
515 void AsyncRedisStorage::findKeysAsync(const Namespace& ns,
516 const std::string& keyPrefix,
517 const FindKeysAck& findKeysAck)
519 //TODO: update to more optimal solution than current KEYS-based one.
522 if (!canOperationBePerformed(ns, boost::none, ec))
524 engine->postCallback(std::bind(findKeysAck, ec, Keys()));
528 dispatcher->dispatchAsync([findKeysAck](const std::error_code& error, const Reply& reply)
531 findKeysAck(error, Keys());
533 findKeysAck(std::error_code(), getKeys(*reply.getArray()));
536 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, keyPrefix)));
539 void AsyncRedisStorage::removeAllAsync(const Namespace& ns,
540 const ModifyAck& modifyAck)
544 if (!canOperationBePerformed(ns, boost::none, ec))
546 engine->postCallback(std::bind(modifyAck, ec));
550 dispatcher->dispatchAsync([this, modifyAck, ns](const std::error_code& error, const Reply& reply)
557 const auto& array(*reply.getArray());
559 modifyAck(std::error_code());
562 removeAsync(ns, getKeys(array), modifyAck);
566 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, "")));
569 std::string AsyncRedisStorage::buildKeyPrefixSearchPattern(const Namespace& ns, const std::string& keyPrefix) const
571 std::string escapedKeyPrefix = keyPrefix;
572 escapeRedisSearchPatternCharacters(escapedKeyPrefix);
573 std::ostringstream oss;
574 oss << '{' << ns << '}' << SEPARATOR << escapedKeyPrefix << "*";