RIC:1060: Change in PTL
[ric-plt/sdl.git] / src / syncstorageimpl.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include <chrono>
23 #include <sstream>
24 #include <sdl/asyncstorage.hpp>
25 #include <sdl/backenderror.hpp>
26 #include <sdl/errorqueries.hpp>
27 #include <sdl/invalidnamespace.hpp>
28 #include <sdl/notconnected.hpp>
29 #include <sdl/operationinterrupted.hpp>
30 #include <sdl/rejectedbybackend.hpp>
31 #include <sdl/rejectedbysdl.hpp>
32 #include "private/redis/asyncredisstorage.hpp"
33 #include "private/syncstorageimpl.hpp"
34 #include "private/system.hpp"
35
36 using namespace shareddatalayer;
37
38 namespace
39 {
40     void throwExceptionForErrorCode[[ noreturn ]](const std::error_code& ec)
41     {
42         if (ec == shareddatalayer::Error::BACKEND_FAILURE)
43             throw BackendError(ec.message());
44         else if (ec == shareddatalayer::Error::NOT_CONNECTED)
45             throw NotConnected(ec.message());
46         else if (ec == shareddatalayer::Error::OPERATION_INTERRUPTED)
47             throw OperationInterrupted(ec.message());
48         else if (ec == shareddatalayer::Error::REJECTED_BY_BACKEND)
49             throw RejectedByBackend(ec.message());
50         else if (ec == AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE)
51             throw InvalidNamespace(ec.message());
52         else if (ec == shareddatalayer::Error::REJECTED_BY_SDL)
53             throw RejectedBySdl(ec.message());
54
55         std::ostringstream os;
56         os << "No corresponding SDL exception found for error code: " << ec.category().name() << " " << ec.value();
57         throw std::range_error(os.str());
58     }
59 }
60
61 /* TODO: This synchronous API implementation could probably be refactored to be boost::asio based
62  * instead of current (bit error prone) poll based implementation.
63  */
64
65 SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> asyncStorage):
66     SyncStorageImpl(std::move(asyncStorage), System::getSystem())
67 {
68 }
69
70 SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> pAsyncStorage,
71                                  System& system):
72     asyncStorage(std::move(pAsyncStorage)),
73     system(system),
74     localStatus(false),
75     synced(false),
76     isReady(false),
77     events{ asyncStorage->fd(), POLLIN, 0 },
78     operationTimeout(std::chrono::steady_clock::duration::zero())
79 {
80 }
81
82 void SyncStorageImpl::waitReadyAck(const std::error_code& error)
83 {
84     isReady = true;
85     localError = error;
86 }
87
88 void SyncStorageImpl::modifyAck(const std::error_code& error)
89 {
90     synced = true;
91     localError = error;
92 }
93
94 void SyncStorageImpl::modifyIfAck(const std::error_code& error, bool status)
95 {
96     synced = true;
97     localError = error;
98     localStatus = status;
99 }
100
101 void SyncStorageImpl::getAck(const std::error_code& error, const DataMap& dataMap)
102 {
103     synced = true;
104     localError = error;
105     localMap = dataMap;
106 }
107
108 void SyncStorageImpl::findKeysAck(const std::error_code& error, const Keys& keys)
109 {
110     synced = true;
111     localError = error;
112     localKeys = keys;
113 }
114
115 void SyncStorageImpl::verifyBackendResponse()
116 {
117     if(localError)
118         throwExceptionForErrorCode(localError);
119 }
120
121 void SyncStorageImpl::waitForOperationCallback()
122 {
123     while(!synced)
124         pollAndHandleEvents(NO_TIMEOUT);
125 }
126
127 void SyncStorageImpl::pollAndHandleEvents(int timeout_ms)
128 {
129     if (system.poll(&events, 1, timeout_ms) > 0 && (events.revents & POLLIN))
130         asyncStorage->handleEvents();
131 }
132
133 void SyncStorageImpl::waitForReadinessCheckCallback(const std::chrono::steady_clock::duration& timeout)
134 {
135     if (timeout == std::chrono::steady_clock::duration::zero())
136     {
137         while (!isReady)
138             pollAndHandleEvents(NO_TIMEOUT);
139     }
140     else
141     {
142         auto timeout_ms(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
143         auto pollTimeout_ms(timeout_ms / 10);
144         std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
145         while(!isReady && (std::chrono::steady_clock::now() - start < std::chrono::milliseconds(timeout_ms)))
146             pollAndHandleEvents(pollTimeout_ms);
147     }
148 }
149
150 void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns)
151 {
152     waitSdlToBeReady(ns, operationTimeout);
153 }
154
155 void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns, const std::chrono::steady_clock::duration& timeout)
156 {
157     isReady = false;
158     asyncStorage->waitReadyAsync(ns,
159                                  std::bind(&shareddatalayer::SyncStorageImpl::waitReadyAck,
160                                            this,
161                                            std::placeholders::_1));
162     waitForReadinessCheckCallback(timeout);
163 }
164
165 void SyncStorageImpl::waitReady(const Namespace& ns, const std::chrono::steady_clock::duration& timeout)
166 {
167     waitSdlToBeReady(ns, timeout);
168     if(!isReady)
169         throw RejectedBySdl("Timeout, SDL service not ready for the '" + ns + "' namespace");
170     verifyBackendResponse();
171 }
172
173 void SyncStorageImpl::set(const Namespace& ns, const DataMap& dataMap)
174 {
175     handlePendingEvents();
176     waitSdlToBeReady(ns);
177     synced = false;
178
179     asyncStorage->setAsync(ns,
180                            dataMap,
181                            std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
182                                      this,
183                                      std::placeholders::_1));
184     waitForOperationCallback();
185     verifyBackendResponse();
186 }
187
188 bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& oldData, const Data& newData)
189 {
190     handlePendingEvents();
191     waitSdlToBeReady(ns);
192     synced = false;
193     asyncStorage->setIfAsync(ns,
194                              key,
195                              oldData,
196                              newData,
197                              std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
198                                        this,
199                                        std::placeholders::_1,
200                                        std::placeholders::_2));
201     waitForOperationCallback();
202     verifyBackendResponse();
203     return localStatus;
204 }
205
206 bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const Data& data)
207 {
208     handlePendingEvents();
209     waitSdlToBeReady(ns);
210     synced = false;
211     asyncStorage->setIfNotExistsAsync(ns,
212                                       key,
213                                       data,
214                                       std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
215                                                 this,
216                                                 std::placeholders::_1,
217                                                 std::placeholders::_2));
218     waitForOperationCallback();
219     verifyBackendResponse();
220     return localStatus;
221 }
222
223 SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& keys)
224 {
225     handlePendingEvents();
226     waitSdlToBeReady(ns);
227     synced = false;
228     asyncStorage->getAsync(ns,
229                            keys,
230                            std::bind(&shareddatalayer::SyncStorageImpl::getAck,
231                                      this,
232                                      std::placeholders::_1,
233                                      std::placeholders::_2));
234     waitForOperationCallback();
235     verifyBackendResponse();
236     return localMap;
237 }
238
239 void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys)
240 {
241     handlePendingEvents();
242     waitSdlToBeReady(ns);
243     synced = false;
244     asyncStorage->removeAsync(ns,
245                               keys,
246                               std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
247                                         this,
248                                         std::placeholders::_1));
249     waitForOperationCallback();
250     verifyBackendResponse();
251 }
252
253 bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& data)
254 {
255     handlePendingEvents();
256     waitSdlToBeReady(ns);
257     synced = false;
258     asyncStorage->removeIfAsync(ns,
259                                 key,
260                                 data,
261                                 std::bind(&shareddatalayer::SyncStorageImpl::modifyIfAck,
262                                           this,
263                                           std::placeholders::_1,
264                                           std::placeholders::_2));
265     waitForOperationCallback();
266     verifyBackendResponse();
267     return localStatus;
268 }
269
270 SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::string& keyPrefix)
271 {
272     handlePendingEvents();
273     waitSdlToBeReady(ns);
274     synced = false;
275     asyncStorage->findKeysAsync(ns,
276                                 keyPrefix,
277                                 std::bind(&shareddatalayer::SyncStorageImpl::findKeysAck,
278                                           this,
279                                           std::placeholders::_1,
280                                           std::placeholders::_2));
281     waitForOperationCallback();
282     verifyBackendResponse();
283     return localKeys;
284 }
285
286 SyncStorageImpl::Keys SyncStorageImpl::listKeys(const Namespace& ns, const std::string& pattern)
287 {
288     handlePendingEvents();
289     waitSdlToBeReady(ns);
290     synced = false;
291     asyncStorage->listKeys(ns,
292                            pattern,
293                            std::bind(&shareddatalayer::SyncStorageImpl::findKeysAck,
294                                      this,
295                                      std::placeholders::_1,
296                                      std::placeholders::_2));
297     waitForOperationCallback();
298     verifyBackendResponse();
299     return localKeys;
300 }
301
302 void SyncStorageImpl::removeAll(const Namespace& ns)
303 {
304     handlePendingEvents();
305     waitSdlToBeReady(ns);
306     synced = false;
307     asyncStorage->removeAllAsync(ns,
308                                  std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
309                                            this,
310                                            std::placeholders::_1));
311     waitForOperationCallback();
312     verifyBackendResponse();
313 }
314
315 void SyncStorageImpl::handlePendingEvents()
316 {
317     int pollRetVal = system.poll(&events, 1, 0);
318
319     while (pollRetVal > 0 && events.revents & POLLIN)
320     {
321         asyncStorage->handleEvents();
322         pollRetVal = system.poll(&events, 1, 0);
323     }
324 }
325
326 void SyncStorageImpl::setOperationTimeout(const std::chrono::steady_clock::duration& timeout)
327 {
328     operationTimeout = timeout;
329 }