10d3a57f8c24a290c7393e2d8a9d88c6504d79e0
[ric-plt/sdl.git] / src / redis / hiredisclusterepolladapter.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 #include "private/redis/hiredisclusterepolladapter.hpp"
18 #include <sys/epoll.h>
19 #include "private/engine.hpp"
20 #include "private/redis/hiredisclustersystem.hpp"
21
22 using namespace shareddatalayer;
23 using namespace shareddatalayer::redis;
24
25 namespace
26 {
27     int attachFunction(redisAsyncContext* ac, void* data)
28     {
29         auto instance(static_cast<HiredisClusterEpollAdapter*>(data));
30         instance->attach(ac);
31         return REDIS_OK;
32     }
33
34     void addReadWrap(void* data)
35     {
36         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
37         instance->addRead();
38     }
39
40     void addWriteWrap(void* data)
41     {
42         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
43         instance->addWrite();
44     }
45
46     void delReadWrap(void* data)
47     {
48         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
49         instance->delRead();
50     }
51
52     void delWriteWrap(void* data)
53     {
54         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
55         instance->delWrite();
56     }
57
58     void cleanupWrap(void* data)
59     {
60         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
61         instance->cleanup();
62     }
63 }
64
65 HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine):
66     HiredisClusterEpollAdapter(engine, HiredisClusterSystem::getInstance())
67 {
68 }
69
70 HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine, HiredisClusterSystem& hiredisClusterSystem):
71     engine(engine),
72     hiredisClusterSystem(hiredisClusterSystem)
73 {
74 }
75
76 void HiredisClusterEpollAdapter::setup(redisClusterAsyncContext* acc)
77 {
78     acc->adapter = this;
79     acc->attach_fn = attachFunction;
80 }
81
82 void HiredisClusterEpollAdapter::attach(redisAsyncContext* ac)
83 {
84     detach(ac);
85     nodes.insert(std::make_pair(ac->c.fd,
86                                 std::unique_ptr<Node>(new Node(engine,
87                                                                ac,
88                                                                hiredisClusterSystem))));
89 }
90
91 void HiredisClusterEpollAdapter::detach(const redisAsyncContext* ac)
92 {
93     auto it = nodes.find(ac->c.fd);
94     if (it != nodes.end())
95         nodes.erase(it);
96 }
97
98 HiredisClusterEpollAdapter::Node::Node(Engine& engine,
99                                        redisAsyncContext* ac,
100                                        HiredisClusterSystem& hiredisClusterSystem):
101     engine(engine),
102     hiredisClusterSystem(hiredisClusterSystem),
103     ac(ac),
104     eventState(0),
105     reading(false),
106     writing(false)
107 {
108     this->ac->ev.data = this;
109     this->ac->ev.addRead = addReadWrap;
110     this->ac->ev.addWrite = addWriteWrap;
111     this->ac->ev.delRead = delReadWrap;
112     this->ac->ev.delWrite = delWriteWrap;
113     this->ac->ev.cleanup = cleanupWrap;
114     engine.addMonitoredFD(ac->c.fd,
115                           eventState,
116                           std::bind(&HiredisClusterEpollAdapter::Node::eventHandler,
117                                     this,
118                                     std::placeholders::_1));
119     isMonitoring = true;
120 }
121
122 HiredisClusterEpollAdapter::Node::~Node()
123 {
124     if (isMonitoring)
125         cleanup();
126 }
127
128 void HiredisClusterEpollAdapter::Node::eventHandler(unsigned int events)
129 {
130     if (events & Engine::EVENT_IN)
131         if (reading && isMonitoring)
132             hiredisClusterSystem.redisAsyncHandleRead(ac);
133     if (events & Engine::EVENT_OUT)
134         if (writing && isMonitoring)
135             hiredisClusterSystem.redisAsyncHandleWrite(ac);
136 }
137
138 void HiredisClusterEpollAdapter::Node::addRead()
139 {
140     if (reading)
141         return;
142     reading = true;
143     eventState |= Engine::EVENT_IN;
144     engine.modifyMonitoredFD(ac->c.fd, eventState);
145 }
146
147 void HiredisClusterEpollAdapter::Node::addWrite()
148 {
149     if (writing)
150         return;
151     writing = true;
152     eventState |= Engine::EVENT_OUT;
153     engine.modifyMonitoredFD(ac->c.fd, eventState);
154 }
155
156 void HiredisClusterEpollAdapter::Node::delRead()
157 {
158     reading = false;
159     eventState &= ~Engine::EVENT_IN;
160     engine.modifyMonitoredFD(ac->c.fd, eventState);
161 }
162
163 void HiredisClusterEpollAdapter::Node::delWrite()
164 {
165     writing = false;
166     eventState &= ~Engine::EVENT_OUT;
167     engine.modifyMonitoredFD(ac->c.fd, eventState);
168 }
169
170 void HiredisClusterEpollAdapter::Node::cleanup()
171 {
172     reading = false;
173     writing = false;
174     eventState = 0;
175     engine.deleteMonitoredFD(ac->c.fd);
176     isMonitoring = false;
177 }