Add first version
[ric-plt/sdl.git] / src / redis / hiredisclusterepolladapter.cpp
diff --git a/src/redis/hiredisclusterepolladapter.cpp b/src/redis/hiredisclusterepolladapter.cpp
new file mode 100644 (file)
index 0000000..10d3a57
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+   Copyright (c) 2018-2019 Nokia.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+#include "private/redis/hiredisclusterepolladapter.hpp"
+#include <sys/epoll.h>
+#include "private/engine.hpp"
+#include "private/redis/hiredisclustersystem.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+    int attachFunction(redisAsyncContext* ac, void* data)
+    {
+        auto instance(static_cast<HiredisClusterEpollAdapter*>(data));
+        instance->attach(ac);
+        return REDIS_OK;
+    }
+
+    void addReadWrap(void* data)
+    {
+        auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
+        instance->addRead();
+    }
+
+    void addWriteWrap(void* data)
+    {
+        auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
+        instance->addWrite();
+    }
+
+    void delReadWrap(void* data)
+    {
+        auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
+        instance->delRead();
+    }
+
+    void delWriteWrap(void* data)
+    {
+        auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
+        instance->delWrite();
+    }
+
+    void cleanupWrap(void* data)
+    {
+        auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
+        instance->cleanup();
+    }
+}
+
+HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine):
+    HiredisClusterEpollAdapter(engine, HiredisClusterSystem::getInstance())
+{
+}
+
+HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine, HiredisClusterSystem& hiredisClusterSystem):
+    engine(engine),
+    hiredisClusterSystem(hiredisClusterSystem)
+{
+}
+
+void HiredisClusterEpollAdapter::setup(redisClusterAsyncContext* acc)
+{
+    acc->adapter = this;
+    acc->attach_fn = attachFunction;
+}
+
+void HiredisClusterEpollAdapter::attach(redisAsyncContext* ac)
+{
+    detach(ac);
+    nodes.insert(std::make_pair(ac->c.fd,
+                                std::unique_ptr<Node>(new Node(engine,
+                                                               ac,
+                                                               hiredisClusterSystem))));
+}
+
+void HiredisClusterEpollAdapter::detach(const redisAsyncContext* ac)
+{
+    auto it = nodes.find(ac->c.fd);
+    if (it != nodes.end())
+        nodes.erase(it);
+}
+
+HiredisClusterEpollAdapter::Node::Node(Engine& engine,
+                                       redisAsyncContext* ac,
+                                       HiredisClusterSystem& hiredisClusterSystem):
+    engine(engine),
+    hiredisClusterSystem(hiredisClusterSystem),
+    ac(ac),
+    eventState(0),
+    reading(false),
+    writing(false)
+{
+    this->ac->ev.data = this;
+    this->ac->ev.addRead = addReadWrap;
+    this->ac->ev.addWrite = addWriteWrap;
+    this->ac->ev.delRead = delReadWrap;
+    this->ac->ev.delWrite = delWriteWrap;
+    this->ac->ev.cleanup = cleanupWrap;
+    engine.addMonitoredFD(ac->c.fd,
+                          eventState,
+                          std::bind(&HiredisClusterEpollAdapter::Node::eventHandler,
+                                    this,
+                                    std::placeholders::_1));
+    isMonitoring = true;
+}
+
+HiredisClusterEpollAdapter::Node::~Node()
+{
+    if (isMonitoring)
+        cleanup();
+}
+
+void HiredisClusterEpollAdapter::Node::eventHandler(unsigned int events)
+{
+    if (events & Engine::EVENT_IN)
+        if (reading && isMonitoring)
+            hiredisClusterSystem.redisAsyncHandleRead(ac);
+    if (events & Engine::EVENT_OUT)
+        if (writing && isMonitoring)
+            hiredisClusterSystem.redisAsyncHandleWrite(ac);
+}
+
+void HiredisClusterEpollAdapter::Node::addRead()
+{
+    if (reading)
+        return;
+    reading = true;
+    eventState |= Engine::EVENT_IN;
+    engine.modifyMonitoredFD(ac->c.fd, eventState);
+}
+
+void HiredisClusterEpollAdapter::Node::addWrite()
+{
+    if (writing)
+        return;
+    writing = true;
+    eventState |= Engine::EVENT_OUT;
+    engine.modifyMonitoredFD(ac->c.fd, eventState);
+}
+
+void HiredisClusterEpollAdapter::Node::delRead()
+{
+    reading = false;
+    eventState &= ~Engine::EVENT_IN;
+    engine.modifyMonitoredFD(ac->c.fd, eventState);
+}
+
+void HiredisClusterEpollAdapter::Node::delWrite()
+{
+    writing = false;
+    eventState &= ~Engine::EVENT_OUT;
+    engine.modifyMonitoredFD(ac->c.fd, eventState);
+}
+
+void HiredisClusterEpollAdapter::Node::cleanup()
+{
+    reading = false;
+    writing = false;
+    eventState = 0;
+    engine.deleteMonitoredFD(ac->c.fd);
+    isMonitoring = false;
+}