05e282735110bb96e91bca1ee4f733d02f61850a
[ric-plt/sdl.git] / src / redis / hiredisclusterepolladapter.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include "private/redis/hiredisclusterepolladapter.hpp"
23 #include <sys/epoll.h>
24 #include "private/engine.hpp"
25 #include "private/redis/hiredisclustersystem.hpp"
26
27 using namespace shareddatalayer;
28 using namespace shareddatalayer::redis;
29
30 namespace
31 {
32     int attachFunction(redisAsyncContext* ac, void* data)
33     {
34         auto instance(static_cast<HiredisClusterEpollAdapter*>(data));
35         instance->attach(ac);
36         return REDIS_OK;
37     }
38
39     void addReadWrap(void* data)
40     {
41         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
42         instance->addRead();
43     }
44
45     void addWriteWrap(void* data)
46     {
47         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
48         instance->addWrite();
49     }
50
51     void delReadWrap(void* data)
52     {
53         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
54         instance->delRead();
55     }
56
57     void delWriteWrap(void* data)
58     {
59         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
60         instance->delWrite();
61     }
62
63     void cleanupWrap(void* data)
64     {
65         auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
66         instance->cleanup();
67     }
68 }
69
70 HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine):
71     HiredisClusterEpollAdapter(engine, HiredisClusterSystem::getInstance())
72 {
73 }
74
75 HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine, HiredisClusterSystem& hiredisClusterSystem):
76     engine(engine),
77     hiredisClusterSystem(hiredisClusterSystem)
78 {
79 }
80
81 void HiredisClusterEpollAdapter::setup(redisClusterAsyncContext* acc)
82 {
83     acc->adapter = this;
84     acc->attach_fn = attachFunction;
85 }
86
87 void HiredisClusterEpollAdapter::attach(redisAsyncContext* ac)
88 {
89     detach(ac);
90     nodes.insert(std::make_pair(ac->c.fd,
91                                 std::unique_ptr<Node>(new Node(engine,
92                                                                ac,
93                                                                hiredisClusterSystem))));
94 }
95
96 void HiredisClusterEpollAdapter::detach(const redisAsyncContext* ac)
97 {
98     auto it = nodes.find(ac->c.fd);
99     if (it != nodes.end())
100         nodes.erase(it);
101 }
102
103 HiredisClusterEpollAdapter::Node::Node(Engine& engine,
104                                        redisAsyncContext* ac,
105                                        HiredisClusterSystem& hiredisClusterSystem):
106     engine(engine),
107     hiredisClusterSystem(hiredisClusterSystem),
108     ac(ac),
109     eventState(0),
110     reading(false),
111     writing(false)
112 {
113     this->ac->ev.data = this;
114     this->ac->ev.addRead = addReadWrap;
115     this->ac->ev.addWrite = addWriteWrap;
116     this->ac->ev.delRead = delReadWrap;
117     this->ac->ev.delWrite = delWriteWrap;
118     this->ac->ev.cleanup = cleanupWrap;
119     engine.addMonitoredFD(ac->c.fd,
120                           eventState,
121                           std::bind(&HiredisClusterEpollAdapter::Node::eventHandler,
122                                     this,
123                                     std::placeholders::_1));
124     isMonitoring = true;
125 }
126
127 HiredisClusterEpollAdapter::Node::~Node()
128 {
129     if (isMonitoring)
130         cleanup();
131 }
132
133 void HiredisClusterEpollAdapter::Node::eventHandler(unsigned int events)
134 {
135     if (events & Engine::EVENT_IN)
136         if (reading && isMonitoring)
137             hiredisClusterSystem.redisAsyncHandleRead(ac);
138     if (events & Engine::EVENT_OUT)
139         if (writing && isMonitoring)
140             hiredisClusterSystem.redisAsyncHandleWrite(ac);
141 }
142
143 void HiredisClusterEpollAdapter::Node::addRead()
144 {
145     if (reading)
146         return;
147     reading = true;
148     eventState |= Engine::EVENT_IN;
149     engine.modifyMonitoredFD(ac->c.fd, eventState);
150 }
151
152 void HiredisClusterEpollAdapter::Node::addWrite()
153 {
154     if (writing)
155         return;
156     writing = true;
157     eventState |= Engine::EVENT_OUT;
158     engine.modifyMonitoredFD(ac->c.fd, eventState);
159 }
160
161 void HiredisClusterEpollAdapter::Node::delRead()
162 {
163     reading = false;
164     eventState &= ~Engine::EVENT_IN;
165     engine.modifyMonitoredFD(ac->c.fd, eventState);
166 }
167
168 void HiredisClusterEpollAdapter::Node::delWrite()
169 {
170     writing = false;
171     eventState &= ~Engine::EVENT_OUT;
172     engine.modifyMonitoredFD(ac->c.fd, eventState);
173 }
174
175 void HiredisClusterEpollAdapter::Node::cleanup()
176 {
177     reading = false;
178     writing = false;
179     eventState = 0;
180     engine.deleteMonitoredFD(ac->c.fd);
181     isMonitoring = false;
182 }