RIC:1060: Change in PTL
[ric-plt/sdl.git] / src / asyncstorageimpl.cpp
index 59d7b22..0d6d683 100644 (file)
    limitations under the License.
 */
 
+/*
+ * This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ * platform project (RICP).
+*/
+
 #include "config.h"
 #include "private/error.hpp"
 #include "private/abort.hpp"
 #include "private/redis/asyncredisstorage.hpp"
 #endif
 
+#include <boost/optional/optional_io.hpp>
+#include <boost/crc.hpp>
+
 using namespace shareddatalayer;
 using namespace shareddatalayer::redis;
 
 namespace
 {
         std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
+                                                                              const std::string& ns,
                                                                               const DatabaseConfiguration& databaseConfiguration,
+                                                                              const boost::optional<std::size_t>& addressIndex,
                                                                               std::shared_ptr<Logger> logger)
         {
             return AsyncDatabaseDiscovery::create(engine,
-                                                  boost::none,
+                                                  ns,
                                                   databaseConfiguration,
+                                                  addressIndex,
                                                   logger);
         }
+
+        std::uint32_t crc32(const std::string& s)
+        {
+           boost::crc_32_type result;
+           result.process_bytes(s.data(), s.size());
+           return result.checksum();
+        }
+
+        std::uint32_t getClusterHashIndex(const std::string& s, const size_t count)
+        {
+            return crc32(s)%count;
+        }
 }
 
 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
@@ -73,19 +96,62 @@ AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
 {
 }
 
-AsyncStorage& AsyncStorageImpl::getRedisHandler()
+void AsyncStorageImpl::setAsyncRedisStorageHandlersForCluster(const std::string& ns)
+{
+    static auto serverCount = databaseConfiguration->getServerAddresses().size();
+    for (std::size_t addrIndex = 0; addrIndex < serverCount; addrIndex++)
+    {
+        auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
+                                                                asyncDatabaseDiscoveryCreator(
+                                                                        engine,
+                                                                        ns,
+                                                                        std::ref(*databaseConfiguration),
+                                                                        addrIndex,
+                                                                        logger),
+                                                                publisherId,
+                                                                namespaceConfigurations,
+                                                                logger);
+        asyncStorages.push_back(redisHandler);
+    }
+}
+
+void AsyncStorageImpl::setAsyncRedisStorageHandlers(const std::string& ns)
+{
+    if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
+        DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
+    {
+            setAsyncRedisStorageHandlersForCluster(ns);
+            return;
+    }
+    auto redisHandler = std::make_shared<AsyncRedisStorage>(engine,
+                                                            asyncDatabaseDiscoveryCreator(
+                                                                    engine,
+                                                                    ns,
+                                                                    std::ref(*databaseConfiguration),
+                                                                    boost::none,
+                                                                    logger),
+                                                            publisherId,
+                                                            namespaceConfigurations,
+                                                            logger);
+    asyncStorages.push_back(redisHandler);
+}
+
+AsyncStorage& AsyncStorageImpl::getAsyncRedisStorageHandler(const std::string& ns)
+{
+    std::size_t handlerIndex{0};
+    if (DatabaseConfiguration::DbType::SDL_STANDALONE_CLUSTER == databaseConfiguration->getDbType() ||
+        DatabaseConfiguration::DbType::SDL_SENTINEL_CLUSTER == databaseConfiguration->getDbType())
+        handlerIndex = getClusterHashIndex(ns, databaseConfiguration->getServerAddresses().size());
+    return *asyncStorages.at(handlerIndex);
+}
+
+AsyncStorage& AsyncStorageImpl::getRedisHandler(const std::string& ns)
 {
 #if HAVE_REDIS
-    static AsyncRedisStorage redisHandler{engine,
-                                          asyncDatabaseDiscoveryCreator(
-                                              engine,
-                                              std::ref(*databaseConfiguration),
-                                              logger),
-                                          publisherId,
-                                          namespaceConfigurations,
-                                          logger};
-
-    return redisHandler;
+    if (asyncStorages.empty())
+            setAsyncRedisStorageHandlers(ns);
+
+    return getAsyncRedisStorageHandler(ns);
 #else
     logger->error() << "Redis operations cannot be performed, Redis not enabled";
     SHAREDDATALAYER_ABORT("Invalid configuration.");
@@ -101,7 +167,7 @@ AsyncStorage& AsyncStorageImpl::getDummyHandler()
 AsyncStorage& AsyncStorageImpl::getOperationHandler(const std::string& ns)
 {
     if (namespaceConfigurations->isDbBackendUseEnabled(ns))
-        return getRedisHandler();
+        return getRedisHandler(ns);
 
     return getDummyHandler();
 }
@@ -175,6 +241,13 @@ void AsyncStorageImpl::findKeysAsync(const Namespace& ns,
     getOperationHandler(ns).findKeysAsync(ns, keyPrefix, findKeysAck);
 }
 
+void AsyncStorageImpl::listKeys(const Namespace& ns,
+                                const std::string& pattern,
+                                const FindKeysAck& findKeysAck)
+{
+    getOperationHandler(ns).listKeys(ns, pattern, findKeysAck);
+}
+
 void AsyncStorageImpl::removeAllAsync(const Namespace& ns,
                                        const ModifyAck& modifyAck)
 {