2 Copyright (c) 2018-2019 Nokia.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
22 #ifndef SHAREDDATALAYER_REDIS_ASYNCHIREDISCLUSTERCOMMANDDISPATCHER_HPP_
23 #define SHAREDDATALAYER_REDIS_ASYNCHIREDISCLUSTERCOMMANDDISPATCHER_HPP_
25 #include "private/redis/asynccommanddispatcher.hpp"
26 #include "private/databaseconfiguration.hpp"
27 #include "private/logger.hpp"
28 #include "private/timer.hpp"
36 #include <boost/optional.hpp>
41 struct redisClusterAsyncContext;
42 struct redisAsyncContext;
45 namespace shareddatalayer
51 class HiredisClusterSystem;
52 class HiredisClusterEpollAdapter;
55 class AsyncHiredisClusterCommandDispatcher: public AsyncCommandDispatcher
58 AsyncHiredisClusterCommandDispatcher(const AsyncHiredisClusterCommandDispatcher&) = delete;
60 AsyncHiredisClusterCommandDispatcher& operator = (const AsyncHiredisClusterCommandDispatcher&) = delete;
62 AsyncHiredisClusterCommandDispatcher(Engine& engine,
63 const boost::optional<std::string>& ns,
64 const DatabaseConfiguration::Addresses& addresses,
65 std::shared_ptr<ContentsBuilder> contentsBuilder,
66 bool usePermanentCommandCallbacks,
67 std::shared_ptr<Logger> logger);
69 AsyncHiredisClusterCommandDispatcher(Engine& engine,
70 const boost::optional<std::string>& ns,
71 const DatabaseConfiguration::Addresses& addresses,
72 std::shared_ptr<ContentsBuilder> contentsBuilder,
73 bool usePermanentCommandCallbacks,
74 HiredisClusterSystem& hiredisClusterSystem,
75 std::shared_ptr<HiredisClusterEpollAdapter> adapter,
76 std::shared_ptr<Logger> logger);
78 ~AsyncHiredisClusterCommandDispatcher() override;
80 void waitConnectedAsync(const ConnectAck& connectAck) override;
82 void registerDisconnectCb(const DisconnectCb& disconnectCb) override;
84 void dispatchAsync(const CommandCb& commandCb, const AsyncConnection::Namespace& ns, const Contents& contents) override;
86 void disableCommandCallbacks() override;
88 void handleReply(const CommandCb& commandCb, const std::error_code& error, const redisReply* rr);
90 bool isClientCallbacksEnabled() const;
92 void handleDisconnect(const redisAsyncContext* ac);
95 enum class ServiceState
101 using Callback = std::function<void(const Reply&)>;
104 const boost::optional<std::string> initialNamespace;
105 const DatabaseConfiguration::Addresses addresses;
106 std::shared_ptr<ContentsBuilder> contentsBuilder;
107 bool usePermanentCommandCallbacks;
108 HiredisClusterSystem& hiredisClusterSystem;
109 std::shared_ptr<HiredisClusterEpollAdapter> adapter;
110 redisClusterAsyncContext* acc;
111 ConnectAck connectAck;
112 DisconnectCb disconnectCallback;
113 ServiceState serviceState;
114 std::list<CommandCb> cbs;
115 bool clientCallbacksEnabled;
116 Timer connectionRetryTimer;
117 Timer::Duration connectionRetryTimerDuration;
118 std::shared_ptr<Logger> logger;
122 bool isValidCb(const CommandCb& commandCb);
124 void removeCb(const CommandCb& commandCb);
126 void callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error);
128 void dispatchAsync(const CommandCb& commandCb, const AsyncConnection::Namespace& ns, const Contents& contents, bool checkConnectionState);
130 void verifyConnection();
132 void verifyConnectionReply(const std::error_code& error, const redis::Reply& reply);
136 void armConnectionRetryTimer();
138 void disconnectHiredisCluster();