22d4b2b969335f4955d7aa76178881bebd8283a6
[ric-plt/sdl.git] / src / redis / asyncredisstorage.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include "config.h"
18 #include <sstream>
19 #include "private/error.hpp"
20 #include <sdl/emptynamespace.hpp>
21 #include <sdl/invalidnamespace.hpp>
22 #include <sdl/publisherid.hpp>
23 #include "private/abort.hpp"
24 #include "private/createlogger.hpp"
25 #include "private/engine.hpp"
26 #include "private/logger.hpp"
27 #include "private/namespacevalidator.hpp"
28 #include "private/configurationreader.hpp"
29 #include "private/redis/asynccommanddispatcher.hpp"
30 #include "private/redis/asyncdatabasediscovery.hpp"
31 #include "private/redis/asyncredisstorage.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/contentsbuilder.hpp"
34 #include "private/redis/redisgeneral.hpp"
35 #include "private/redis/reply.hpp"
36
37 using namespace shareddatalayer;
38 using namespace shareddatalayer::redis;
39
40 /*  TODO: This implementation contains lot of duplicated code with old API (asyncRedisConnection).
41  *  When this new API is fully ready and tested old API implementation could be changed to utilize this
42  *  (bit like sync API utilizes async API).
43  */
44
45 namespace
46 {
47     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
48                                                                           const DatabaseInfo& databaseInfo,
49                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
50                                                                           std::shared_ptr<Logger> logger)
51     {
52         return AsyncCommandDispatcher::create(engine,
53                                               databaseInfo,
54                                               contentsBuilder,
55                                               false,
56                                               logger);
57     }
58
59     class AsyncRedisStorageErrorCategory: public std::error_category
60     {
61     public:
62         AsyncRedisStorageErrorCategory() = default;
63
64         const char* name() const noexcept override;
65
66         std::string message(int condition) const override;
67
68         std::error_condition default_error_condition(int condition) const noexcept override;
69     };
70
71     const char* AsyncRedisStorageErrorCategory::name() const noexcept
72     {
73         return "asyncredisstorage";
74     }
75
76     std::string AsyncRedisStorageErrorCategory::message(int condition) const
77     {
78         switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
79         {
80             case AsyncRedisStorage::ErrorCode::SUCCESS:
81                 return std::error_code().message();
82             case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
83                 return "connection to the underlying data storage not yet available";
84             case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
85                 return "invalid namespace identifier passed to SDL API";
86             case AsyncRedisStorage::ErrorCode::END_MARKER:
87                 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)");
88                 return "unsupported error code for message()";
89             default:
90                 return "description missing for AsyncRedisStorageErrorCategory error: " + std::to_string(condition);
91         }
92     }
93
94     std::error_condition AsyncRedisStorageErrorCategory::default_error_condition(int condition) const noexcept
95     {
96         switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
97         {
98             case AsyncRedisStorage::ErrorCode::SUCCESS:
99                 return InternalError::SUCCESS;
100             case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
101                 return InternalError::SDL_NOT_READY;
102             case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
103                 return InternalError::SDL_RECEIVED_INVALID_PARAMETER;
104             case AsyncRedisStorage::ErrorCode::END_MARKER:
105                 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)");
106                 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
107             default:
108                 std::ostringstream msg;
109                 msg << "default error condition missing for AsyncRedisStorageErrorCategory error: "
110                     << condition;
111                 logErrorOnce(msg.str());
112                 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
113         }
114     }
115
116     AsyncStorage::DataMap buildDataMap(const AsyncStorage::Keys& keys, const Reply::ReplyVector& replyVector)
117     {
118         AsyncStorage::DataMap dataMap;
119         auto i(0U);
120         for (const auto& j : keys)
121         {
122             if (replyVector[i]->getType() == Reply::Type::STRING)
123             {
124                 AsyncStorage::Data data;
125                 auto dataStr(replyVector[i]->getString());
126                 for (ReplyStringLength k(0); k < dataStr->len; ++k)
127                     data.push_back(static_cast<uint8_t>(dataStr->str[static_cast<size_t>(k)]));
128                 dataMap.insert({ j, data });
129             }
130             ++i;
131         }
132         return dataMap;
133     }
134
135     AsyncStorage::Key getKey(const Reply::DataItem& item)
136     {
137         std::string str(item.str.c_str(), static_cast<size_t>(item.len));
138         auto res(str.find(AsyncRedisStorage::SEPARATOR));
139         return str.substr(res + 1);
140     }
141
142     AsyncStorage::Keys getKeys(const Reply::ReplyVector& replyVector)
143     {
144         AsyncStorage::Keys keys;
145         for (const auto& i : replyVector)
146         {
147             if (i->getType() == Reply::Type::STRING)
148                 keys.insert(getKey(*i->getString()));
149         }
150         return keys;
151     }
152
153     void escapeRedisSearchPatternCharacters(std::string& stringToProcess)
154     {
155         const std::string redisSearchPatternCharacters = R"(*?[]\)";
156
157         std::size_t foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters);
158
159         while (foundPosition != std::string::npos)
160         {
161             stringToProcess.insert(foundPosition, R"(\)");
162             foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters, foundPosition + 2);
163         }
164     }
165 }
166
167 AsyncRedisStorage::ErrorCode& shareddatalayer::operator++ (AsyncRedisStorage::ErrorCode& ecEnum)
168 {
169     if (ecEnum == AsyncRedisStorage::ErrorCode::END_MARKER)
170         throw std::out_of_range("for AsyncRedisStorage::ErrorCode& operator ++");
171     ecEnum = AsyncRedisStorage::ErrorCode(static_cast<std::underlying_type<AsyncRedisStorage::ErrorCode>::type>(ecEnum) + 1);
172     return ecEnum;
173 }
174
175 std::error_code shareddatalayer::make_error_code(AsyncRedisStorage::ErrorCode errorCode)
176 {
177     return std::error_code(static_cast<int>(errorCode), AsyncRedisStorage::errorCategory());
178 }
179
180 const std::error_category& AsyncRedisStorage::errorCategory() noexcept
181 {
182     static const AsyncRedisStorageErrorCategory theAsyncRedisStorageErrorCategory;
183     return theAsyncRedisStorageErrorCategory;
184 }
185
186 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
187                                      std::shared_ptr<AsyncDatabaseDiscovery> discovery,
188                                      const boost::optional<PublisherId>& pId,
189                                      std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
190                                      std::shared_ptr<Logger> logger):
191     AsyncRedisStorage(engine,
192                       discovery,
193                       pId,
194                       namespaceConfigurations,
195                       ::asyncCommandDispatcherCreator,
196                       std::make_shared<redis::ContentsBuilder>(SEPARATOR),
197                       logger)
198 {
199 }
200
201 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
202                                      std::shared_ptr<redis::AsyncDatabaseDiscovery> discovery,
203                                      const boost::optional<PublisherId>& pId,
204                                      std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
205                                      const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
206                                      std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
207                                      std::shared_ptr<Logger> logger):
208     engine(engine),
209     dispatcher(nullptr),
210     discovery(discovery),
211     publisherId(pId),
212     asyncCommandDispatcherCreator(asyncCommandDispatcherCreator),
213     contentsBuilder(contentsBuilder),
214     namespaceConfigurations(namespaceConfigurations),
215     logger(logger)
216 {
217     if(publisherId && (*publisherId).empty())
218     {
219         throw std::invalid_argument("AsyncRedisStorage: empty publisher ID string given");
220     }
221
222     discovery->setStateChangedCb([this](const redis::DatabaseInfo& databaseInfo)
223                                  {
224                                      serviceStateChanged(databaseInfo);
225                                  });
226 }
227
228 AsyncRedisStorage::~AsyncRedisStorage()
229 {
230     if (discovery)
231         discovery->clearStateChangedCb();
232     if (dispatcher)
233         dispatcher->disableCommandCallbacks();
234 }
235
236 redis::DatabaseInfo& AsyncRedisStorage::getDatabaseInfo()
237 {
238     return dbInfo;
239 }
240
241 void AsyncRedisStorage::serviceStateChanged(const redis::DatabaseInfo& newDatabaseInfo)
242 {
243     dispatcher = asyncCommandDispatcherCreator(*engine,
244                                                newDatabaseInfo,
245                                                contentsBuilder,
246                                                logger);
247     if (readyAck)
248         dispatcher->waitConnectedAsync([this]()
249                                        {
250                                            readyAck(std::error_code());
251                                            readyAck = ReadyAck();
252                                        });
253     dbInfo = newDatabaseInfo;
254 }
255
256 int AsyncRedisStorage::fd() const
257 {
258     return engine->fd();
259 }
260
261 void AsyncRedisStorage::handleEvents()
262 {
263     engine->handleEvents();
264 }
265
266 bool AsyncRedisStorage::canOperationBePerformed(const Namespace& ns,
267                                                 boost::optional<bool> noKeysGiven,
268                                                 std::error_code& ecToReturn)
269 {
270     if (!::isValidNamespace(ns))
271     {
272         logErrorOnce("Invalid namespace identifier: " + ns + " passed to SDL");
273         ecToReturn = std::error_code(ErrorCode::INVALID_NAMESPACE);
274         return false;
275     }
276     if (noKeysGiven && *noKeysGiven)
277     {
278         ecToReturn = std::error_code();
279         return false;
280     }
281     if (!dispatcher)
282     {
283         ecToReturn = std::error_code(ErrorCode::REDIS_NOT_YET_DISCOVERED);
284         return false;
285     }
286
287     ecToReturn = std::error_code();
288     return true;
289 }
290
291 void AsyncRedisStorage::waitReadyAsync(const Namespace&,
292                                        const ReadyAck& readyAck)
293 {
294     if (dispatcher)
295         dispatcher->waitConnectedAsync([readyAck]()
296                                        {
297                                            readyAck(std::error_code());
298                                        });
299     else
300         this->readyAck = readyAck;
301 }
302
303 void AsyncRedisStorage::setAsync(const Namespace& ns,
304                                  const DataMap& dataMap,
305                                  const ModifyAck& modifyAck)
306 {
307     std::error_code ec;
308
309     if (!canOperationBePerformed(ns, dataMap.empty(), ec))
310     {
311         engine->postCallback(std::bind(modifyAck, ec));
312         return;
313     }
314
315     if (namespaceConfigurations->areNotificationsEnabled(ns))
316         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
317                                             this,
318                                             std::placeholders::_1,
319                                             std::placeholders::_2,
320                                             modifyAck),
321                                   ns,
322                                   contentsBuilder->build("MSETPUB", ns, dataMap, ns, getPublishMessage()));
323     else
324         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
325                                             this,
326                                             std::placeholders::_1,
327                                             std::placeholders::_2,
328                                             modifyAck),
329                                   ns,
330                                   contentsBuilder->build("MSET", ns, dataMap));
331 }
332
333 void AsyncRedisStorage::modificationCommandCallback(const std::error_code& error,
334                                                     const Reply&,
335                                                     const ModifyAck& modifyAck )
336 {
337     modifyAck(error);
338 }
339
340 void AsyncRedisStorage::conditionalCommandCallback(const std::error_code& error,
341                                                    const Reply& reply,
342                                                    const ModifyIfAck& modifyIfAck)
343 {
344     auto type(reply.getType());
345     if (error ||
346         (type == Reply::Type::NIL) || // SETIE(PUB)
347         ((type == Reply::Type::INTEGER) && (reply.getInteger() != 1))) // SETNX(PUB) and DELIE(PUB)
348         modifyIfAck(error, false);
349     else
350         modifyIfAck(error, true);
351 }
352
353 void AsyncRedisStorage::setIfAsync(const Namespace& ns,
354                                    const Key& key,
355                                    const Data& oldData,
356                                    const Data& newData,
357                                    const ModifyIfAck& modifyIfAck)
358 {
359     std::error_code ec;
360
361     if (!canOperationBePerformed(ns, boost::none, ec))
362     {
363         engine->postCallback(std::bind(modifyIfAck, ec, false));
364         return;
365     }
366
367     if (namespaceConfigurations->areNotificationsEnabled(ns))
368         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
369                                             this,
370                                             std::placeholders::_1,
371                                             std::placeholders::_2,
372                                             modifyIfAck),
373                                   ns,
374                                   contentsBuilder->build("SETIEPUB", ns, key, newData, oldData, ns, getPublishMessage()));
375     else
376         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
377                                             this,
378                                             std::placeholders::_1,
379                                             std::placeholders::_2,
380                                             modifyIfAck),
381                                   ns,
382                                   contentsBuilder->build("SETIE", ns, key, newData, oldData));
383 }
384
385 void AsyncRedisStorage::removeIfAsync(const Namespace& ns,
386                                       const Key& key,
387                                       const Data& data,
388                                       const ModifyIfAck& modifyIfAck)
389 {
390     std::error_code ec;
391
392     if (!canOperationBePerformed(ns, boost::none, ec))
393     {
394         engine->postCallback(std::bind(modifyIfAck, ec, false));
395         return;
396     }
397
398     if (namespaceConfigurations->areNotificationsEnabled(ns))
399         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
400                                             this,
401                                             std::placeholders::_1,
402                                             std::placeholders::_2,
403                                             modifyIfAck),
404                                   ns,
405                                   contentsBuilder->build("DELIEPUB", ns, key, data, ns, getPublishMessage()));
406     else
407         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
408                                             this,
409                                             std::placeholders::_1,
410                                             std::placeholders::_2,
411                                             modifyIfAck),
412                                   ns,
413                                   contentsBuilder->build("DELIE", ns, key, data));
414 }
415
416 std::string AsyncRedisStorage::getPublishMessage() const
417 {
418     if(publisherId)
419         return *publisherId;
420     else
421         return NO_PUBLISHER;
422 }
423
424 void AsyncRedisStorage::setIfNotExistsAsync(const Namespace& ns,
425                                             const Key& key,
426                                             const Data& data,
427                                             const ModifyIfAck& modifyIfAck)
428 {
429     std::error_code ec;
430
431     if (!canOperationBePerformed(ns, boost::none, ec))
432     {
433         engine->postCallback(std::bind(modifyIfAck, ec, false));
434         return;
435     }
436
437     if (namespaceConfigurations->areNotificationsEnabled(ns))
438         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
439                                             this,
440                                             std::placeholders::_1,
441                                             std::placeholders::_2,
442                                             modifyIfAck),
443                                   ns,
444                                   contentsBuilder->build("SETNXPUB", ns, key, data, ns ,getPublishMessage()));
445     else
446         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
447                                             this,
448                                             std::placeholders::_1,
449                                             std::placeholders::_2,
450                                             modifyIfAck),
451                                   ns,
452                                   contentsBuilder->build("SETNX", ns, key, data));
453 }
454
455 void AsyncRedisStorage::getAsync(const Namespace& ns,
456                                  const Keys& keys,
457                                  const GetAck& getAck)
458 {
459     std::error_code ec;
460
461     if (!canOperationBePerformed(ns, keys.empty(), ec))
462     {
463         engine->postCallback(std::bind(getAck, ec, DataMap()));
464         return;
465     }
466
467     dispatcher->dispatchAsync([getAck, keys](const std::error_code& error,
468                                                  const Reply& reply)
469                               {
470                                   if (error)
471                                       getAck(error, DataMap());
472                                   else
473                                       getAck(std::error_code(), buildDataMap(keys, *reply.getArray()));
474                               },
475                               ns,
476                               contentsBuilder->build("MGET", ns, keys));
477 }
478
479 void AsyncRedisStorage::removeAsync(const Namespace& ns,
480                                     const Keys& keys,
481                                     const ModifyAck& modifyAck)
482 {
483     std::error_code ec;
484
485     if (!canOperationBePerformed(ns, keys.empty(), ec))
486     {
487         engine->postCallback(std::bind(modifyAck, ec));
488         return;
489     }
490
491     if (namespaceConfigurations->areNotificationsEnabled(ns))
492         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
493                                             this,
494                                             std::placeholders::_1,
495                                             std::placeholders::_2,
496                                             modifyAck),
497                                   ns,
498                                   contentsBuilder->build("DELPUB", ns, keys, ns, getPublishMessage()));
499     else
500         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
501                                             this,
502                                             std::placeholders::_1,
503                                             std::placeholders::_2,
504                                             modifyAck),
505                                   ns,
506                                   contentsBuilder->build("DEL", ns, keys));
507 }
508
509 void AsyncRedisStorage::findKeysAsync(const Namespace& ns,
510                                       const std::string& keyPrefix,
511                                       const FindKeysAck& findKeysAck)
512 {
513     //TODO: update to more optimal solution than current KEYS-based one.
514     std::error_code ec;
515
516     if (!canOperationBePerformed(ns, boost::none, ec))
517     {
518         engine->postCallback(std::bind(findKeysAck, ec, Keys()));
519         return;
520     }
521
522     dispatcher->dispatchAsync([findKeysAck](const std::error_code& error, const Reply& reply)
523                               {
524                                   if (error)
525                                       findKeysAck(error, Keys());
526                                   else
527                                       findKeysAck(std::error_code(), getKeys(*reply.getArray()));
528                               },
529                               ns,
530                               contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, keyPrefix)));
531 }
532
533 void AsyncRedisStorage::removeAllAsync(const Namespace& ns,
534                                        const ModifyAck& modifyAck)
535 {
536     std::error_code ec;
537
538     if (!canOperationBePerformed(ns, boost::none, ec))
539     {
540         engine->postCallback(std::bind(modifyAck, ec));
541         return;
542     }
543
544     dispatcher->dispatchAsync([this, modifyAck, ns](const std::error_code& error, const Reply& reply)
545                               {
546                                   if (error)
547                                   {
548                                       modifyAck(error);
549                                       return;
550                                   }
551                                   const auto& array(*reply.getArray());
552                                   if (array.empty())
553                                       modifyAck(std::error_code());
554                                   else
555                                   {
556                                       removeAsync(ns, getKeys(array), modifyAck);
557                                   }
558                               },
559                               ns,
560                               contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, "")));
561 }
562
563 std::string AsyncRedisStorage::buildKeyPrefixSearchPattern(const Namespace& ns, const std::string& keyPrefix) const
564 {
565     std::string escapedKeyPrefix = keyPrefix;
566     escapeRedisSearchPatternCharacters(escapedKeyPrefix);
567     std::ostringstream oss;
568     oss << '{' << ns << '}' << SEPARATOR << escapedKeyPrefix << "*";
569     return oss.str();
570 }