New listKeys() API to support glob-style key search patterns
[ric-plt/sdl.git] / src / redis / asyncredisstorage.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include "config.h"
23 #include <sstream>
24 #include "private/error.hpp"
25 #include <sdl/emptynamespace.hpp>
26 #include <sdl/invalidnamespace.hpp>
27 #include <sdl/publisherid.hpp>
28 #include "private/abort.hpp"
29 #include "private/createlogger.hpp"
30 #include "private/engine.hpp"
31 #include "private/logger.hpp"
32 #include "private/namespacevalidator.hpp"
33 #include "private/configurationreader.hpp"
34 #include "private/redis/asynccommanddispatcher.hpp"
35 #include "private/redis/asyncdatabasediscovery.hpp"
36 #include "private/redis/asyncredisstorage.hpp"
37 #include "private/redis/contents.hpp"
38 #include "private/redis/contentsbuilder.hpp"
39 #include "private/redis/redisgeneral.hpp"
40 #include "private/redis/reply.hpp"
41
42 using namespace shareddatalayer;
43 using namespace shareddatalayer::redis;
44
45 /*  TODO: This implementation contains lot of duplicated code with old API (asyncRedisConnection).
46  *  When this new API is fully ready and tested old API implementation could be changed to utilize this
47  *  (bit like sync API utilizes async API).
48  */
49
50 namespace
51 {
52     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
53                                                                           const DatabaseInfo& databaseInfo,
54                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
55                                                                           std::shared_ptr<Logger> logger)
56     {
57         return AsyncCommandDispatcher::create(engine,
58                                               databaseInfo,
59                                               contentsBuilder,
60                                               false,
61                                               logger,
62                                               false);
63     }
64
65     class AsyncRedisStorageErrorCategory: public std::error_category
66     {
67     public:
68         AsyncRedisStorageErrorCategory() = default;
69
70         const char* name() const noexcept override;
71
72         std::string message(int condition) const override;
73
74         std::error_condition default_error_condition(int condition) const noexcept override;
75     };
76
77     const char* AsyncRedisStorageErrorCategory::name() const noexcept
78     {
79         return "asyncredisstorage";
80     }
81
82     std::string AsyncRedisStorageErrorCategory::message(int condition) const
83     {
84         switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
85         {
86             case AsyncRedisStorage::ErrorCode::SUCCESS:
87                 return std::error_code().message();
88             case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
89                 return "connection to the underlying data storage not yet available";
90             case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
91                 return "invalid namespace identifier passed to SDL API";
92             case AsyncRedisStorage::ErrorCode::END_MARKER:
93                 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)");
94                 return "unsupported error code for message()";
95             default:
96                 return "description missing for AsyncRedisStorageErrorCategory error: " + std::to_string(condition);
97         }
98     }
99
100     std::error_condition AsyncRedisStorageErrorCategory::default_error_condition(int condition) const noexcept
101     {
102         switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
103         {
104             case AsyncRedisStorage::ErrorCode::SUCCESS:
105                 return InternalError::SUCCESS;
106             case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
107                 return InternalError::SDL_NOT_READY;
108             case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
109                 return InternalError::SDL_RECEIVED_INVALID_PARAMETER;
110             case AsyncRedisStorage::ErrorCode::END_MARKER:
111                 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)");
112                 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
113             default:
114                 std::ostringstream msg;
115                 msg << "default error condition missing for AsyncRedisStorageErrorCategory error: "
116                     << condition;
117                 logErrorOnce(msg.str());
118                 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
119         }
120     }
121
122     AsyncStorage::DataMap buildDataMap(const AsyncStorage::Keys& keys, const Reply::ReplyVector& replyVector)
123     {
124         AsyncStorage::DataMap dataMap;
125         auto i(0U);
126         for (const auto& j : keys)
127         {
128             if (replyVector[i]->getType() == Reply::Type::STRING)
129             {
130                 AsyncStorage::Data data;
131                 auto dataStr(replyVector[i]->getString());
132                 for (ReplyStringLength k(0); k < dataStr->len; ++k)
133                     data.push_back(static_cast<uint8_t>(dataStr->str[static_cast<size_t>(k)]));
134                 dataMap.insert({ j, data });
135             }
136             ++i;
137         }
138         return dataMap;
139     }
140
141     AsyncStorage::Key getKey(const Reply::DataItem& item)
142     {
143         std::string str(item.str.c_str(), static_cast<size_t>(item.len));
144         auto res(str.find(AsyncRedisStorage::SEPARATOR));
145         return str.substr(res + 1);
146     }
147
148     AsyncStorage::Keys getKeys(const Reply::ReplyVector& replyVector)
149     {
150         AsyncStorage::Keys keys;
151         for (const auto& i : replyVector)
152         {
153             if (i->getType() == Reply::Type::STRING)
154                 keys.insert(getKey(*i->getString()));
155         }
156         return keys;
157     }
158
159     void escapeRedisSearchPatternCharacters(std::string& stringToProcess)
160     {
161         const std::string redisSearchPatternCharacters = R"(*?[]\)";
162
163         std::size_t foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters);
164
165         while (foundPosition != std::string::npos)
166         {
167             stringToProcess.insert(foundPosition, R"(\)");
168             foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters, foundPosition + 2);
169         }
170     }
171 }
172
173 AsyncRedisStorage::ErrorCode& shareddatalayer::operator++ (AsyncRedisStorage::ErrorCode& ecEnum)
174 {
175     if (ecEnum == AsyncRedisStorage::ErrorCode::END_MARKER)
176         throw std::out_of_range("for AsyncRedisStorage::ErrorCode& operator ++");
177     ecEnum = AsyncRedisStorage::ErrorCode(static_cast<std::underlying_type<AsyncRedisStorage::ErrorCode>::type>(ecEnum) + 1);
178     return ecEnum;
179 }
180
181 std::error_code shareddatalayer::make_error_code(AsyncRedisStorage::ErrorCode errorCode)
182 {
183     return std::error_code(static_cast<int>(errorCode), AsyncRedisStorage::errorCategory());
184 }
185
186 const std::error_category& AsyncRedisStorage::errorCategory() noexcept
187 {
188     static const AsyncRedisStorageErrorCategory theAsyncRedisStorageErrorCategory;
189     return theAsyncRedisStorageErrorCategory;
190 }
191
192 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
193                                      std::shared_ptr<AsyncDatabaseDiscovery> discovery,
194                                      const boost::optional<PublisherId>& pId,
195                                      std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
196                                      std::shared_ptr<Logger> logger):
197     AsyncRedisStorage(engine,
198                       discovery,
199                       pId,
200                       namespaceConfigurations,
201                       ::asyncCommandDispatcherCreator,
202                       std::make_shared<redis::ContentsBuilder>(SEPARATOR),
203                       logger)
204 {
205 }
206
207 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
208                                      std::shared_ptr<redis::AsyncDatabaseDiscovery> discovery,
209                                      const boost::optional<PublisherId>& pId,
210                                      std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
211                                      const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
212                                      std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
213                                      std::shared_ptr<Logger> logger):
214     engine(engine),
215     dispatcher(nullptr),
216     discovery(discovery),
217     publisherId(pId),
218     asyncCommandDispatcherCreator(asyncCommandDispatcherCreator),
219     contentsBuilder(contentsBuilder),
220     namespaceConfigurations(namespaceConfigurations),
221     logger(logger)
222 {
223     if(publisherId && (*publisherId).empty())
224     {
225         throw std::invalid_argument("AsyncRedisStorage: empty publisher ID string given");
226     }
227
228     discovery->setStateChangedCb([this](const redis::DatabaseInfo& databaseInfo)
229                                  {
230                                      serviceStateChanged(databaseInfo);
231                                  });
232 }
233
234 AsyncRedisStorage::~AsyncRedisStorage()
235 {
236     if (discovery)
237         discovery->clearStateChangedCb();
238     if (dispatcher)
239         dispatcher->disableCommandCallbacks();
240 }
241
242 redis::DatabaseInfo& AsyncRedisStorage::getDatabaseInfo()
243 {
244     return dbInfo;
245 }
246
247 void AsyncRedisStorage::serviceStateChanged(const redis::DatabaseInfo& newDatabaseInfo)
248 {
249     dispatcher = asyncCommandDispatcherCreator(*engine,
250                                                newDatabaseInfo,
251                                                contentsBuilder,
252                                                logger);
253     if (readyAck)
254         dispatcher->waitConnectedAsync([this]()
255                                        {
256                                            readyAck(std::error_code());
257                                            readyAck = ReadyAck();
258                                        });
259     dbInfo = newDatabaseInfo;
260 }
261
262 int AsyncRedisStorage::fd() const
263 {
264     return engine->fd();
265 }
266
267 void AsyncRedisStorage::handleEvents()
268 {
269     engine->handleEvents();
270 }
271
272 bool AsyncRedisStorage::canOperationBePerformed(const Namespace& ns,
273                                                 boost::optional<bool> noKeysGiven,
274                                                 std::error_code& ecToReturn)
275 {
276     if (!::isValidNamespace(ns))
277     {
278         logErrorOnce("Invalid namespace identifier: " + ns + " passed to SDL");
279         ecToReturn = std::error_code(ErrorCode::INVALID_NAMESPACE);
280         return false;
281     }
282     if (noKeysGiven && *noKeysGiven)
283     {
284         ecToReturn = std::error_code();
285         return false;
286     }
287     if (!dispatcher)
288     {
289         ecToReturn = std::error_code(ErrorCode::REDIS_NOT_YET_DISCOVERED);
290         return false;
291     }
292
293     ecToReturn = std::error_code();
294     return true;
295 }
296
297 void AsyncRedisStorage::waitReadyAsync(const Namespace&,
298                                        const ReadyAck& readyAck)
299 {
300     if (dispatcher)
301         dispatcher->waitConnectedAsync([readyAck]()
302                                        {
303                                            readyAck(std::error_code());
304                                        });
305     else
306         this->readyAck = readyAck;
307 }
308
309 void AsyncRedisStorage::setAsync(const Namespace& ns,
310                                  const DataMap& dataMap,
311                                  const ModifyAck& modifyAck)
312 {
313     std::error_code ec;
314
315     if (!canOperationBePerformed(ns, dataMap.empty(), ec))
316     {
317         engine->postCallback(std::bind(modifyAck, ec));
318         return;
319     }
320
321     if (namespaceConfigurations->areNotificationsEnabled(ns))
322         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
323                                             this,
324                                             std::placeholders::_1,
325                                             std::placeholders::_2,
326                                             modifyAck),
327                                   ns,
328                                   contentsBuilder->build("MSETPUB", ns, dataMap, ns, getPublishMessage()));
329     else
330         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
331                                             this,
332                                             std::placeholders::_1,
333                                             std::placeholders::_2,
334                                             modifyAck),
335                                   ns,
336                                   contentsBuilder->build("MSET", ns, dataMap));
337 }
338
339 void AsyncRedisStorage::modificationCommandCallback(const std::error_code& error,
340                                                     const Reply&,
341                                                     const ModifyAck& modifyAck )
342 {
343     modifyAck(error);
344 }
345
346 void AsyncRedisStorage::conditionalCommandCallback(const std::error_code& error,
347                                                    const Reply& reply,
348                                                    const ModifyIfAck& modifyIfAck)
349 {
350     auto type(reply.getType());
351     if (error ||
352         (type == Reply::Type::NIL) || // SETIE(PUB)
353         ((type == Reply::Type::INTEGER) && (reply.getInteger() != 1))) // SETNX(PUB) and DELIE(PUB)
354         modifyIfAck(error, false);
355     else
356         modifyIfAck(error, true);
357 }
358
359 void AsyncRedisStorage::setIfAsync(const Namespace& ns,
360                                    const Key& key,
361                                    const Data& oldData,
362                                    const Data& newData,
363                                    const ModifyIfAck& modifyIfAck)
364 {
365     std::error_code ec;
366
367     if (!canOperationBePerformed(ns, boost::none, ec))
368     {
369         engine->postCallback(std::bind(modifyIfAck, ec, false));
370         return;
371     }
372
373     if (namespaceConfigurations->areNotificationsEnabled(ns))
374         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
375                                             this,
376                                             std::placeholders::_1,
377                                             std::placeholders::_2,
378                                             modifyIfAck),
379                                   ns,
380                                   contentsBuilder->build("SETIEPUB", ns, key, newData, oldData, ns, getPublishMessage()));
381     else
382         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
383                                             this,
384                                             std::placeholders::_1,
385                                             std::placeholders::_2,
386                                             modifyIfAck),
387                                   ns,
388                                   contentsBuilder->build("SETIE", ns, key, newData, oldData));
389 }
390
391 void AsyncRedisStorage::removeIfAsync(const Namespace& ns,
392                                       const Key& key,
393                                       const Data& data,
394                                       const ModifyIfAck& modifyIfAck)
395 {
396     std::error_code ec;
397
398     if (!canOperationBePerformed(ns, boost::none, ec))
399     {
400         engine->postCallback(std::bind(modifyIfAck, ec, false));
401         return;
402     }
403
404     if (namespaceConfigurations->areNotificationsEnabled(ns))
405         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
406                                             this,
407                                             std::placeholders::_1,
408                                             std::placeholders::_2,
409                                             modifyIfAck),
410                                   ns,
411                                   contentsBuilder->build("DELIEPUB", ns, key, data, ns, getPublishMessage()));
412     else
413         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
414                                             this,
415                                             std::placeholders::_1,
416                                             std::placeholders::_2,
417                                             modifyIfAck),
418                                   ns,
419                                   contentsBuilder->build("DELIE", ns, key, data));
420 }
421
422 std::string AsyncRedisStorage::getPublishMessage() const
423 {
424     if(publisherId)
425         return *publisherId;
426     else
427         return NO_PUBLISHER;
428 }
429
430 void AsyncRedisStorage::setIfNotExistsAsync(const Namespace& ns,
431                                             const Key& key,
432                                             const Data& data,
433                                             const ModifyIfAck& modifyIfAck)
434 {
435     std::error_code ec;
436
437     if (!canOperationBePerformed(ns, boost::none, ec))
438     {
439         engine->postCallback(std::bind(modifyIfAck, ec, false));
440         return;
441     }
442
443     if (namespaceConfigurations->areNotificationsEnabled(ns))
444         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
445                                             this,
446                                             std::placeholders::_1,
447                                             std::placeholders::_2,
448                                             modifyIfAck),
449                                   ns,
450                                   contentsBuilder->build("SETNXPUB", ns, key, data, ns ,getPublishMessage()));
451     else
452         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
453                                             this,
454                                             std::placeholders::_1,
455                                             std::placeholders::_2,
456                                             modifyIfAck),
457                                   ns,
458                                   contentsBuilder->build("SETNX", ns, key, data));
459 }
460
461 void AsyncRedisStorage::getAsync(const Namespace& ns,
462                                  const Keys& keys,
463                                  const GetAck& getAck)
464 {
465     std::error_code ec;
466
467     if (!canOperationBePerformed(ns, keys.empty(), ec))
468     {
469         engine->postCallback(std::bind(getAck, ec, DataMap()));
470         return;
471     }
472
473     dispatcher->dispatchAsync([getAck, keys](const std::error_code& error,
474                                                  const Reply& reply)
475                               {
476                                   if (error)
477                                       getAck(error, DataMap());
478                                   else
479                                       getAck(std::error_code(), buildDataMap(keys, *reply.getArray()));
480                               },
481                               ns,
482                               contentsBuilder->build("MGET", ns, keys));
483 }
484
485 void AsyncRedisStorage::removeAsync(const Namespace& ns,
486                                     const Keys& keys,
487                                     const ModifyAck& modifyAck)
488 {
489     std::error_code ec;
490
491     if (!canOperationBePerformed(ns, keys.empty(), ec))
492     {
493         engine->postCallback(std::bind(modifyAck, ec));
494         return;
495     }
496
497     if (namespaceConfigurations->areNotificationsEnabled(ns))
498         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
499                                             this,
500                                             std::placeholders::_1,
501                                             std::placeholders::_2,
502                                             modifyAck),
503                                   ns,
504                                   contentsBuilder->build("DELPUB", ns, keys, ns, getPublishMessage()));
505     else
506         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
507                                             this,
508                                             std::placeholders::_1,
509                                             std::placeholders::_2,
510                                             modifyAck),
511                                   ns,
512                                   contentsBuilder->build("DEL", ns, keys));
513 }
514
515 void AsyncRedisStorage::findKeys(const Namespace& ns,
516                                  const std::string& keyPattern,
517                                  const FindKeysAck& findKeysAck)
518 {
519     //TODO: update to more optimal solution than current KEYS-based one.
520     std::error_code ec;
521
522     if (!canOperationBePerformed(ns, boost::none, ec))
523     {
524         engine->postCallback(std::bind(findKeysAck, ec, Keys()));
525         return;
526     }
527
528     dispatcher->dispatchAsync([findKeysAck](const std::error_code& error, const Reply& reply)
529                               {
530                                   if (error)
531                                       findKeysAck(error, Keys());
532                                   else
533                                       findKeysAck(std::error_code(), getKeys(*reply.getArray()));
534                               },
535                               ns,
536                               contentsBuilder->build("KEYS", keyPattern));
537 }
538
539 void AsyncRedisStorage::findKeysAsync(const Namespace& ns,
540                                       const std::string& keyPrefix,
541                                       const FindKeysAck& findKeysAck)
542 {
543     auto keyPattern(buildKeyPrefixSearchPattern(ns, keyPrefix));
544     findKeys(ns, keyPattern, findKeysAck);
545 }
546
547 void AsyncRedisStorage::listKeys(const Namespace& ns,
548                                  const std::string& pattern,
549                                  const FindKeysAck& findKeysAck)
550 {
551     auto keyPattern(buildNamespaceKeySearchPattern(ns, pattern));
552     findKeys(ns, keyPattern, findKeysAck);
553 }
554
555 void AsyncRedisStorage::removeAllAsync(const Namespace& ns,
556                                        const ModifyAck& modifyAck)
557 {
558     std::error_code ec;
559
560     if (!canOperationBePerformed(ns, boost::none, ec))
561     {
562         engine->postCallback(std::bind(modifyAck, ec));
563         return;
564     }
565
566     dispatcher->dispatchAsync([this, modifyAck, ns](const std::error_code& error, const Reply& reply)
567                               {
568                                   if (error)
569                                   {
570                                       modifyAck(error);
571                                       return;
572                                   }
573                                   const auto& array(*reply.getArray());
574                                   if (array.empty())
575                                       modifyAck(std::error_code());
576                                   else
577                                   {
578                                       removeAsync(ns, getKeys(array), modifyAck);
579                                   }
580                               },
581                               ns,
582                               contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, "")));
583 }
584
585 std::string AsyncRedisStorage::buildKeyPrefixSearchPattern(const Namespace& ns, const std::string& keyPrefix) const
586 {
587     std::string escapedKeyPrefix = keyPrefix;
588     escapeRedisSearchPatternCharacters(escapedKeyPrefix);
589     std::ostringstream oss;
590     oss << '{' << ns << '}' << SEPARATOR << escapedKeyPrefix << "*";
591     return oss.str();
592 }
593
594 std::string AsyncRedisStorage::buildNamespaceKeySearchPattern(const Namespace& ns,
595                                                               const std::string& pattern) const
596 {
597     std::ostringstream oss;
598     oss << '{' << ns << '}' << SEPARATOR << pattern;
599     return oss.str();
600 }