X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fsyncstorageimpl.cpp;fp=src%2Fsyncstorageimpl.cpp;h=76c15467f5026a3d3b016ef598180556c1eac00d;hb=faf9fc79e58fa4ace9b0ef317b741afb2c1a8abe;hp=4ddc723d7e8d9c66d7ed50e64dd9781476801b68;hpb=6e77ef4f748bdfa40505fc11c0a190e7a40fdb46;p=ric-plt%2Fsdl.git diff --git a/src/syncstorageimpl.cpp b/src/syncstorageimpl.cpp index 4ddc723..76c1546 100644 --- a/src/syncstorageimpl.cpp +++ b/src/syncstorageimpl.cpp @@ -19,8 +19,8 @@ * platform project (RICP). */ +#include #include -#include #include #include #include @@ -58,6 +58,10 @@ namespace } } +/* TODO: This synchronous API implementation could probably be refactored to be boost::asio based + * instead of current (bit error prone) poll based implementation. + */ + SyncStorageImpl::SyncStorageImpl(std::unique_ptr asyncStorage): SyncStorageImpl(std::move(asyncStorage), System::getSystem()) { @@ -67,10 +71,17 @@ SyncStorageImpl::SyncStorageImpl(std::unique_ptr pAsyncStorage, System& system): asyncStorage(std::move(pAsyncStorage)), system(system), - pFd(asyncStorage->fd()), localStatus(false), - synced(false) + synced(false), + isReady(false), + events{ asyncStorage->fd(), POLLIN, 0 }, + operationTimeout(std::chrono::steady_clock::duration::zero()) +{ +} + +void SyncStorageImpl::waitReadyAck(const std::error_code&) { + isReady = true; } void SyncStorageImpl::modifyAck(const std::error_code& error) @@ -106,40 +117,62 @@ void SyncStorageImpl::verifyBackendResponse() throwExceptionForErrorCode(localError); } -void SyncStorageImpl::waitForCallback() +void SyncStorageImpl::waitForOperationCallback() { - struct pollfd events { pFd, POLLIN, 0 }; while(!synced) - if (system.poll(&events, 1, -1) > 0 && (events.revents & POLLIN)) - asyncStorage->handleEvents(); + pollAndHandleEvents(NO_TIMEOUT); +} + +void SyncStorageImpl::pollAndHandleEvents(int timeout_ms) +{ + if (system.poll(&events, 1, timeout_ms) > 0 && (events.revents & POLLIN)) + asyncStorage->handleEvents(); +} + +void SyncStorageImpl::waitForReadinessCheckCallback() +{ + if (operationTimeout == std::chrono::steady_clock::duration::zero()) + { + while (!isReady) + pollAndHandleEvents(NO_TIMEOUT); + } + else + { + int pollTimeout_ms = std::chrono::duration_cast(operationTimeout).count() / 10; + std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); + while(!isReady && (std::chrono::steady_clock::now() - start < operationTimeout)) + pollAndHandleEvents(pollTimeout_ms); + } } void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns) { - synced = false; + isReady = false; asyncStorage->waitReadyAsync(ns, - std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, + std::bind(&shareddatalayer::SyncStorageImpl::waitReadyAck, this, std::error_code())); - waitForCallback(); - verifyBackendResponse(); + waitForReadinessCheckCallback(); } void SyncStorageImpl::set(const Namespace& ns, const DataMap& dataMap) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; + asyncStorage->setAsync(ns, dataMap, std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, this, std::placeholders::_1)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); } bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& oldData, const Data& newData) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->setIfAsync(ns, @@ -150,13 +183,14 @@ bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& old this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localStatus; } bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const Data& data) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->setIfNotExistsAsync(ns, @@ -166,13 +200,14 @@ bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localStatus; } SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& keys) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->getAsync(ns, @@ -181,13 +216,14 @@ SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& k this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localMap; } void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->removeAsync(ns, @@ -195,12 +231,13 @@ void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys) std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, this, std::placeholders::_1)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); } bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& data) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->removeIfAsync(ns, @@ -210,13 +247,14 @@ bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localStatus; } SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::string& keyPrefix) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->findKeysAsync(ns, @@ -225,19 +263,36 @@ SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std:: this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localKeys; } void SyncStorageImpl::removeAll(const Namespace& ns) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->removeAllAsync(ns, std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, this, std::placeholders::_1)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); } + +void SyncStorageImpl::handlePendingEvents() +{ + int pollRetVal = system.poll(&events, 1, 0); + + while (pollRetVal > 0 && events.revents & POLLIN) + { + asyncStorage->handleEvents(); + pollRetVal = system.poll(&events, 1, 0); + } +} + +void SyncStorageImpl::setOperationTimeout(const std::chrono::steady_clock::duration& timeout) +{ + operationTimeout = timeout; +}