* platform project (RICP).
*/
+#include <chrono>
#include <sstream>
-#include <sys/poll.h>
#include <sdl/asyncstorage.hpp>
#include <sdl/backenderror.hpp>
#include <sdl/errorqueries.hpp>
}
}
+/* TODO: This synchronous API implementation could probably be refactored to be boost::asio based
+ * instead of current (bit error prone) poll based implementation.
+ */
+
SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> asyncStorage):
SyncStorageImpl(std::move(asyncStorage), System::getSystem())
{
System& system):
asyncStorage(std::move(pAsyncStorage)),
system(system),
- pFd(asyncStorage->fd()),
localStatus(false),
- synced(false)
+ synced(false),
+ isReady(false),
+ events{ asyncStorage->fd(), POLLIN, 0 },
+ operationTimeout(std::chrono::steady_clock::duration::zero())
+{
+}
+
+void SyncStorageImpl::waitReadyAck(const std::error_code&)
{
+ isReady = true;
}
void SyncStorageImpl::modifyAck(const std::error_code& error)
throwExceptionForErrorCode(localError);
}
-void SyncStorageImpl::waitForCallback()
+void SyncStorageImpl::waitForOperationCallback()
{
- struct pollfd events { pFd, POLLIN, 0 };
while(!synced)
- if (system.poll(&events, 1, -1) > 0 && (events.revents & POLLIN))
- asyncStorage->handleEvents();
+ pollAndHandleEvents(NO_TIMEOUT);
+}
+
+void SyncStorageImpl::pollAndHandleEvents(int timeout_ms)
+{
+ if (system.poll(&events, 1, timeout_ms) > 0 && (events.revents & POLLIN))
+ asyncStorage->handleEvents();
+}
+
+void SyncStorageImpl::waitForReadinessCheckCallback()
+{
+ if (operationTimeout == std::chrono::steady_clock::duration::zero())
+ {
+ while (!isReady)
+ pollAndHandleEvents(NO_TIMEOUT);
+ }
+ else
+ {
+ int pollTimeout_ms = std::chrono::duration_cast<std::chrono::milliseconds>(operationTimeout).count() / 10;
+ std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
+ while(!isReady && (std::chrono::steady_clock::now() - start < operationTimeout))
+ pollAndHandleEvents(pollTimeout_ms);
+ }
}
void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns)
{
- synced = false;
+ isReady = false;
asyncStorage->waitReadyAsync(ns,
- std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
+ std::bind(&shareddatalayer::SyncStorageImpl::waitReadyAck,
this,
std::error_code()));
- waitForCallback();
- verifyBackendResponse();
+ waitForReadinessCheckCallback();
}
void SyncStorageImpl::set(const Namespace& ns, const DataMap& dataMap)
{
+ handlePendingEvents();
waitSdlToBeReady(ns);
synced = false;
+
asyncStorage->setAsync(ns,
dataMap,
std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
this,
std::placeholders::_1));
- waitForCallback();
+ waitForOperationCallback();
verifyBackendResponse();
}
bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& oldData, const Data& newData)
{
+ handlePendingEvents();
waitSdlToBeReady(ns);
synced = false;
asyncStorage->setIfAsync(ns,
this,
std::placeholders::_1,
std::placeholders::_2));
- waitForCallback();
+ waitForOperationCallback();
verifyBackendResponse();
return localStatus;
}
bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const Data& data)
{
+ handlePendingEvents();
waitSdlToBeReady(ns);
synced = false;
asyncStorage->setIfNotExistsAsync(ns,
this,
std::placeholders::_1,
std::placeholders::_2));
- waitForCallback();
+ waitForOperationCallback();
verifyBackendResponse();
return localStatus;
}
SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& keys)
{
+ handlePendingEvents();
waitSdlToBeReady(ns);
synced = false;
asyncStorage->getAsync(ns,
this,
std::placeholders::_1,
std::placeholders::_2));
- waitForCallback();
+ waitForOperationCallback();
verifyBackendResponse();
return localMap;
}
void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys)
{
+ handlePendingEvents();
waitSdlToBeReady(ns);
synced = false;
asyncStorage->removeAsync(ns,
std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
this,
std::placeholders::_1));
- waitForCallback();
+ waitForOperationCallback();
verifyBackendResponse();
}
bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& data)
{
+ handlePendingEvents();
waitSdlToBeReady(ns);
synced = false;
asyncStorage->removeIfAsync(ns,
this,
std::placeholders::_1,
std::placeholders::_2));
- waitForCallback();
+ waitForOperationCallback();
verifyBackendResponse();
return localStatus;
}
SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::string& keyPrefix)
{
+ handlePendingEvents();
waitSdlToBeReady(ns);
synced = false;
asyncStorage->findKeysAsync(ns,
this,
std::placeholders::_1,
std::placeholders::_2));
- waitForCallback();
+ waitForOperationCallback();
verifyBackendResponse();
return localKeys;
}
void SyncStorageImpl::removeAll(const Namespace& ns)
{
+ handlePendingEvents();
waitSdlToBeReady(ns);
synced = false;
asyncStorage->removeAllAsync(ns,
std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
this,
std::placeholders::_1));
- waitForCallback();
+ waitForOperationCallback();
verifyBackendResponse();
}
+
+void SyncStorageImpl::handlePendingEvents()
+{
+ int pollRetVal = system.poll(&events, 1, 0);
+
+ while (pollRetVal > 0 && events.revents & POLLIN)
+ {
+ asyncStorage->handleEvents();
+ pollRetVal = system.poll(&events, 1, 0);
+ }
+}
+
+void SyncStorageImpl::setOperationTimeout(const std::chrono::steady_clock::duration& timeout)
+{
+ operationTimeout = timeout;
+}