Add Redis Sentinel based database discovery
[ric-plt/sdl.git] / src / redis / asyncredisstorage.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include "config.h"
18 #include <sstream>
19 #include "private/error.hpp"
20 #include <sdl/emptynamespace.hpp>
21 #include <sdl/invalidnamespace.hpp>
22 #include <sdl/publisherid.hpp>
23 #include "private/abort.hpp"
24 #include "private/createlogger.hpp"
25 #include "private/engine.hpp"
26 #include "private/logger.hpp"
27 #include "private/namespacevalidator.hpp"
28 #include "private/configurationreader.hpp"
29 #include "private/redis/asynccommanddispatcher.hpp"
30 #include "private/redis/asyncdatabasediscovery.hpp"
31 #include "private/redis/asyncredisstorage.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/contentsbuilder.hpp"
34 #include "private/redis/redisgeneral.hpp"
35 #include "private/redis/reply.hpp"
36
37 using namespace shareddatalayer;
38 using namespace shareddatalayer::redis;
39
40 /*  TODO: This implementation contains lot of duplicated code with old API (asyncRedisConnection).
41  *  When this new API is fully ready and tested old API implementation could be changed to utilize this
42  *  (bit like sync API utilizes async API).
43  */
44
45 namespace
46 {
47     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
48                                                                           const DatabaseInfo& databaseInfo,
49                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
50                                                                           std::shared_ptr<Logger> logger)
51     {
52         return AsyncCommandDispatcher::create(engine,
53                                               databaseInfo,
54                                               contentsBuilder,
55                                               false,
56                                               logger,
57                                               false);
58     }
59
60     class AsyncRedisStorageErrorCategory: public std::error_category
61     {
62     public:
63         AsyncRedisStorageErrorCategory() = default;
64
65         const char* name() const noexcept override;
66
67         std::string message(int condition) const override;
68
69         std::error_condition default_error_condition(int condition) const noexcept override;
70     };
71
72     const char* AsyncRedisStorageErrorCategory::name() const noexcept
73     {
74         return "asyncredisstorage";
75     }
76
77     std::string AsyncRedisStorageErrorCategory::message(int condition) const
78     {
79         switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
80         {
81             case AsyncRedisStorage::ErrorCode::SUCCESS:
82                 return std::error_code().message();
83             case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
84                 return "connection to the underlying data storage not yet available";
85             case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
86                 return "invalid namespace identifier passed to SDL API";
87             case AsyncRedisStorage::ErrorCode::END_MARKER:
88                 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)");
89                 return "unsupported error code for message()";
90             default:
91                 return "description missing for AsyncRedisStorageErrorCategory error: " + std::to_string(condition);
92         }
93     }
94
95     std::error_condition AsyncRedisStorageErrorCategory::default_error_condition(int condition) const noexcept
96     {
97         switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
98         {
99             case AsyncRedisStorage::ErrorCode::SUCCESS:
100                 return InternalError::SUCCESS;
101             case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
102                 return InternalError::SDL_NOT_READY;
103             case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
104                 return InternalError::SDL_RECEIVED_INVALID_PARAMETER;
105             case AsyncRedisStorage::ErrorCode::END_MARKER:
106                 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)");
107                 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
108             default:
109                 std::ostringstream msg;
110                 msg << "default error condition missing for AsyncRedisStorageErrorCategory error: "
111                     << condition;
112                 logErrorOnce(msg.str());
113                 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
114         }
115     }
116
117     AsyncStorage::DataMap buildDataMap(const AsyncStorage::Keys& keys, const Reply::ReplyVector& replyVector)
118     {
119         AsyncStorage::DataMap dataMap;
120         auto i(0U);
121         for (const auto& j : keys)
122         {
123             if (replyVector[i]->getType() == Reply::Type::STRING)
124             {
125                 AsyncStorage::Data data;
126                 auto dataStr(replyVector[i]->getString());
127                 for (ReplyStringLength k(0); k < dataStr->len; ++k)
128                     data.push_back(static_cast<uint8_t>(dataStr->str[static_cast<size_t>(k)]));
129                 dataMap.insert({ j, data });
130             }
131             ++i;
132         }
133         return dataMap;
134     }
135
136     AsyncStorage::Key getKey(const Reply::DataItem& item)
137     {
138         std::string str(item.str.c_str(), static_cast<size_t>(item.len));
139         auto res(str.find(AsyncRedisStorage::SEPARATOR));
140         return str.substr(res + 1);
141     }
142
143     AsyncStorage::Keys getKeys(const Reply::ReplyVector& replyVector)
144     {
145         AsyncStorage::Keys keys;
146         for (const auto& i : replyVector)
147         {
148             if (i->getType() == Reply::Type::STRING)
149                 keys.insert(getKey(*i->getString()));
150         }
151         return keys;
152     }
153
154     void escapeRedisSearchPatternCharacters(std::string& stringToProcess)
155     {
156         const std::string redisSearchPatternCharacters = R"(*?[]\)";
157
158         std::size_t foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters);
159
160         while (foundPosition != std::string::npos)
161         {
162             stringToProcess.insert(foundPosition, R"(\)");
163             foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters, foundPosition + 2);
164         }
165     }
166 }
167
168 AsyncRedisStorage::ErrorCode& shareddatalayer::operator++ (AsyncRedisStorage::ErrorCode& ecEnum)
169 {
170     if (ecEnum == AsyncRedisStorage::ErrorCode::END_MARKER)
171         throw std::out_of_range("for AsyncRedisStorage::ErrorCode& operator ++");
172     ecEnum = AsyncRedisStorage::ErrorCode(static_cast<std::underlying_type<AsyncRedisStorage::ErrorCode>::type>(ecEnum) + 1);
173     return ecEnum;
174 }
175
176 std::error_code shareddatalayer::make_error_code(AsyncRedisStorage::ErrorCode errorCode)
177 {
178     return std::error_code(static_cast<int>(errorCode), AsyncRedisStorage::errorCategory());
179 }
180
181 const std::error_category& AsyncRedisStorage::errorCategory() noexcept
182 {
183     static const AsyncRedisStorageErrorCategory theAsyncRedisStorageErrorCategory;
184     return theAsyncRedisStorageErrorCategory;
185 }
186
187 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
188                                      std::shared_ptr<AsyncDatabaseDiscovery> discovery,
189                                      const boost::optional<PublisherId>& pId,
190                                      std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
191                                      std::shared_ptr<Logger> logger):
192     AsyncRedisStorage(engine,
193                       discovery,
194                       pId,
195                       namespaceConfigurations,
196                       ::asyncCommandDispatcherCreator,
197                       std::make_shared<redis::ContentsBuilder>(SEPARATOR),
198                       logger)
199 {
200 }
201
202 AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
203                                      std::shared_ptr<redis::AsyncDatabaseDiscovery> discovery,
204                                      const boost::optional<PublisherId>& pId,
205                                      std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
206                                      const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
207                                      std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
208                                      std::shared_ptr<Logger> logger):
209     engine(engine),
210     dispatcher(nullptr),
211     discovery(discovery),
212     publisherId(pId),
213     asyncCommandDispatcherCreator(asyncCommandDispatcherCreator),
214     contentsBuilder(contentsBuilder),
215     namespaceConfigurations(namespaceConfigurations),
216     logger(logger)
217 {
218     if(publisherId && (*publisherId).empty())
219     {
220         throw std::invalid_argument("AsyncRedisStorage: empty publisher ID string given");
221     }
222
223     discovery->setStateChangedCb([this](const redis::DatabaseInfo& databaseInfo)
224                                  {
225                                      serviceStateChanged(databaseInfo);
226                                  });
227 }
228
229 AsyncRedisStorage::~AsyncRedisStorage()
230 {
231     if (discovery)
232         discovery->clearStateChangedCb();
233     if (dispatcher)
234         dispatcher->disableCommandCallbacks();
235 }
236
237 redis::DatabaseInfo& AsyncRedisStorage::getDatabaseInfo()
238 {
239     return dbInfo;
240 }
241
242 void AsyncRedisStorage::serviceStateChanged(const redis::DatabaseInfo& newDatabaseInfo)
243 {
244     dispatcher = asyncCommandDispatcherCreator(*engine,
245                                                newDatabaseInfo,
246                                                contentsBuilder,
247                                                logger);
248     if (readyAck)
249         dispatcher->waitConnectedAsync([this]()
250                                        {
251                                            readyAck(std::error_code());
252                                            readyAck = ReadyAck();
253                                        });
254     dbInfo = newDatabaseInfo;
255 }
256
257 int AsyncRedisStorage::fd() const
258 {
259     return engine->fd();
260 }
261
262 void AsyncRedisStorage::handleEvents()
263 {
264     engine->handleEvents();
265 }
266
267 bool AsyncRedisStorage::canOperationBePerformed(const Namespace& ns,
268                                                 boost::optional<bool> noKeysGiven,
269                                                 std::error_code& ecToReturn)
270 {
271     if (!::isValidNamespace(ns))
272     {
273         logErrorOnce("Invalid namespace identifier: " + ns + " passed to SDL");
274         ecToReturn = std::error_code(ErrorCode::INVALID_NAMESPACE);
275         return false;
276     }
277     if (noKeysGiven && *noKeysGiven)
278     {
279         ecToReturn = std::error_code();
280         return false;
281     }
282     if (!dispatcher)
283     {
284         ecToReturn = std::error_code(ErrorCode::REDIS_NOT_YET_DISCOVERED);
285         return false;
286     }
287
288     ecToReturn = std::error_code();
289     return true;
290 }
291
292 void AsyncRedisStorage::waitReadyAsync(const Namespace&,
293                                        const ReadyAck& readyAck)
294 {
295     if (dispatcher)
296         dispatcher->waitConnectedAsync([readyAck]()
297                                        {
298                                            readyAck(std::error_code());
299                                        });
300     else
301         this->readyAck = readyAck;
302 }
303
304 void AsyncRedisStorage::setAsync(const Namespace& ns,
305                                  const DataMap& dataMap,
306                                  const ModifyAck& modifyAck)
307 {
308     std::error_code ec;
309
310     if (!canOperationBePerformed(ns, dataMap.empty(), ec))
311     {
312         engine->postCallback(std::bind(modifyAck, ec));
313         return;
314     }
315
316     if (namespaceConfigurations->areNotificationsEnabled(ns))
317         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
318                                             this,
319                                             std::placeholders::_1,
320                                             std::placeholders::_2,
321                                             modifyAck),
322                                   ns,
323                                   contentsBuilder->build("MSETPUB", ns, dataMap, ns, getPublishMessage()));
324     else
325         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
326                                             this,
327                                             std::placeholders::_1,
328                                             std::placeholders::_2,
329                                             modifyAck),
330                                   ns,
331                                   contentsBuilder->build("MSET", ns, dataMap));
332 }
333
334 void AsyncRedisStorage::modificationCommandCallback(const std::error_code& error,
335                                                     const Reply&,
336                                                     const ModifyAck& modifyAck )
337 {
338     modifyAck(error);
339 }
340
341 void AsyncRedisStorage::conditionalCommandCallback(const std::error_code& error,
342                                                    const Reply& reply,
343                                                    const ModifyIfAck& modifyIfAck)
344 {
345     auto type(reply.getType());
346     if (error ||
347         (type == Reply::Type::NIL) || // SETIE(PUB)
348         ((type == Reply::Type::INTEGER) && (reply.getInteger() != 1))) // SETNX(PUB) and DELIE(PUB)
349         modifyIfAck(error, false);
350     else
351         modifyIfAck(error, true);
352 }
353
354 void AsyncRedisStorage::setIfAsync(const Namespace& ns,
355                                    const Key& key,
356                                    const Data& oldData,
357                                    const Data& newData,
358                                    const ModifyIfAck& modifyIfAck)
359 {
360     std::error_code ec;
361
362     if (!canOperationBePerformed(ns, boost::none, ec))
363     {
364         engine->postCallback(std::bind(modifyIfAck, ec, false));
365         return;
366     }
367
368     if (namespaceConfigurations->areNotificationsEnabled(ns))
369         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
370                                             this,
371                                             std::placeholders::_1,
372                                             std::placeholders::_2,
373                                             modifyIfAck),
374                                   ns,
375                                   contentsBuilder->build("SETIEPUB", ns, key, newData, oldData, ns, getPublishMessage()));
376     else
377         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
378                                             this,
379                                             std::placeholders::_1,
380                                             std::placeholders::_2,
381                                             modifyIfAck),
382                                   ns,
383                                   contentsBuilder->build("SETIE", ns, key, newData, oldData));
384 }
385
386 void AsyncRedisStorage::removeIfAsync(const Namespace& ns,
387                                       const Key& key,
388                                       const Data& data,
389                                       const ModifyIfAck& modifyIfAck)
390 {
391     std::error_code ec;
392
393     if (!canOperationBePerformed(ns, boost::none, ec))
394     {
395         engine->postCallback(std::bind(modifyIfAck, ec, false));
396         return;
397     }
398
399     if (namespaceConfigurations->areNotificationsEnabled(ns))
400         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
401                                             this,
402                                             std::placeholders::_1,
403                                             std::placeholders::_2,
404                                             modifyIfAck),
405                                   ns,
406                                   contentsBuilder->build("DELIEPUB", ns, key, data, ns, getPublishMessage()));
407     else
408         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
409                                             this,
410                                             std::placeholders::_1,
411                                             std::placeholders::_2,
412                                             modifyIfAck),
413                                   ns,
414                                   contentsBuilder->build("DELIE", ns, key, data));
415 }
416
417 std::string AsyncRedisStorage::getPublishMessage() const
418 {
419     if(publisherId)
420         return *publisherId;
421     else
422         return NO_PUBLISHER;
423 }
424
425 void AsyncRedisStorage::setIfNotExistsAsync(const Namespace& ns,
426                                             const Key& key,
427                                             const Data& data,
428                                             const ModifyIfAck& modifyIfAck)
429 {
430     std::error_code ec;
431
432     if (!canOperationBePerformed(ns, boost::none, ec))
433     {
434         engine->postCallback(std::bind(modifyIfAck, ec, false));
435         return;
436     }
437
438     if (namespaceConfigurations->areNotificationsEnabled(ns))
439         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
440                                             this,
441                                             std::placeholders::_1,
442                                             std::placeholders::_2,
443                                             modifyIfAck),
444                                   ns,
445                                   contentsBuilder->build("SETNXPUB", ns, key, data, ns ,getPublishMessage()));
446     else
447         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
448                                             this,
449                                             std::placeholders::_1,
450                                             std::placeholders::_2,
451                                             modifyIfAck),
452                                   ns,
453                                   contentsBuilder->build("SETNX", ns, key, data));
454 }
455
456 void AsyncRedisStorage::getAsync(const Namespace& ns,
457                                  const Keys& keys,
458                                  const GetAck& getAck)
459 {
460     std::error_code ec;
461
462     if (!canOperationBePerformed(ns, keys.empty(), ec))
463     {
464         engine->postCallback(std::bind(getAck, ec, DataMap()));
465         return;
466     }
467
468     dispatcher->dispatchAsync([getAck, keys](const std::error_code& error,
469                                                  const Reply& reply)
470                               {
471                                   if (error)
472                                       getAck(error, DataMap());
473                                   else
474                                       getAck(std::error_code(), buildDataMap(keys, *reply.getArray()));
475                               },
476                               ns,
477                               contentsBuilder->build("MGET", ns, keys));
478 }
479
480 void AsyncRedisStorage::removeAsync(const Namespace& ns,
481                                     const Keys& keys,
482                                     const ModifyAck& modifyAck)
483 {
484     std::error_code ec;
485
486     if (!canOperationBePerformed(ns, keys.empty(), ec))
487     {
488         engine->postCallback(std::bind(modifyAck, ec));
489         return;
490     }
491
492     if (namespaceConfigurations->areNotificationsEnabled(ns))
493         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
494                                             this,
495                                             std::placeholders::_1,
496                                             std::placeholders::_2,
497                                             modifyAck),
498                                   ns,
499                                   contentsBuilder->build("DELPUB", ns, keys, ns, getPublishMessage()));
500     else
501         dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
502                                             this,
503                                             std::placeholders::_1,
504                                             std::placeholders::_2,
505                                             modifyAck),
506                                   ns,
507                                   contentsBuilder->build("DEL", ns, keys));
508 }
509
510 void AsyncRedisStorage::findKeysAsync(const Namespace& ns,
511                                       const std::string& keyPrefix,
512                                       const FindKeysAck& findKeysAck)
513 {
514     //TODO: update to more optimal solution than current KEYS-based one.
515     std::error_code ec;
516
517     if (!canOperationBePerformed(ns, boost::none, ec))
518     {
519         engine->postCallback(std::bind(findKeysAck, ec, Keys()));
520         return;
521     }
522
523     dispatcher->dispatchAsync([findKeysAck](const std::error_code& error, const Reply& reply)
524                               {
525                                   if (error)
526                                       findKeysAck(error, Keys());
527                                   else
528                                       findKeysAck(std::error_code(), getKeys(*reply.getArray()));
529                               },
530                               ns,
531                               contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, keyPrefix)));
532 }
533
534 void AsyncRedisStorage::removeAllAsync(const Namespace& ns,
535                                        const ModifyAck& modifyAck)
536 {
537     std::error_code ec;
538
539     if (!canOperationBePerformed(ns, boost::none, ec))
540     {
541         engine->postCallback(std::bind(modifyAck, ec));
542         return;
543     }
544
545     dispatcher->dispatchAsync([this, modifyAck, ns](const std::error_code& error, const Reply& reply)
546                               {
547                                   if (error)
548                                   {
549                                       modifyAck(error);
550                                       return;
551                                   }
552                                   const auto& array(*reply.getArray());
553                                   if (array.empty())
554                                       modifyAck(std::error_code());
555                                   else
556                                   {
557                                       removeAsync(ns, getKeys(array), modifyAck);
558                                   }
559                               },
560                               ns,
561                               contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, "")));
562 }
563
564 std::string AsyncRedisStorage::buildKeyPrefixSearchPattern(const Namespace& ns, const std::string& keyPrefix) const
565 {
566     std::string escapedKeyPrefix = keyPrefix;
567     escapeRedisSearchPatternCharacters(escapedKeyPrefix);
568     std::ostringstream oss;
569     oss << '{' << ns << '}' << SEPARATOR << escapedKeyPrefix << "*";
570     return oss.str();
571 }