X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fsyncstorageimpl.cpp;h=19f0e547d8e8e329c5889d63c1ee0f9d549b7b92;hb=63869e10ac4d8572238989e1b582c0314da91f9c;hp=4ddc723d7e8d9c66d7ed50e64dd9781476801b68;hpb=a0745d294dcd72f7d78ea4c3accd3b477dd668a5;p=ric-plt%2Fsdl.git diff --git a/src/syncstorageimpl.cpp b/src/syncstorageimpl.cpp index 4ddc723..19f0e54 100644 --- a/src/syncstorageimpl.cpp +++ b/src/syncstorageimpl.cpp @@ -19,8 +19,8 @@ * platform project (RICP). */ +#include #include -#include #include #include #include @@ -58,6 +58,10 @@ namespace } } +/* TODO: This synchronous API implementation could probably be refactored to be boost::asio based + * instead of current (bit error prone) poll based implementation. + */ + SyncStorageImpl::SyncStorageImpl(std::unique_ptr asyncStorage): SyncStorageImpl(std::move(asyncStorage), System::getSystem()) { @@ -67,10 +71,18 @@ SyncStorageImpl::SyncStorageImpl(std::unique_ptr pAsyncStorage, System& system): asyncStorage(std::move(pAsyncStorage)), system(system), - pFd(asyncStorage->fd()), localStatus(false), - synced(false) + synced(false), + isReady(false), + events{ asyncStorage->fd(), POLLIN, 0 }, + operationTimeout(std::chrono::steady_clock::duration::zero()) +{ +} + +void SyncStorageImpl::waitReadyAck(const std::error_code& error) { + isReady = true; + localError = error; } void SyncStorageImpl::modifyAck(const std::error_code& error) @@ -106,40 +118,76 @@ void SyncStorageImpl::verifyBackendResponse() throwExceptionForErrorCode(localError); } -void SyncStorageImpl::waitForCallback() +void SyncStorageImpl::waitForOperationCallback() { - struct pollfd events { pFd, POLLIN, 0 }; while(!synced) - if (system.poll(&events, 1, -1) > 0 && (events.revents & POLLIN)) - asyncStorage->handleEvents(); + pollAndHandleEvents(NO_TIMEOUT); +} + +void SyncStorageImpl::pollAndHandleEvents(int timeout_ms) +{ + if (system.poll(&events, 1, timeout_ms) > 0 && (events.revents & POLLIN)) + asyncStorage->handleEvents(); +} + +void SyncStorageImpl::waitForReadinessCheckCallback(const std::chrono::steady_clock::duration& timeout) +{ + if (timeout == std::chrono::steady_clock::duration::zero()) + { + while (!isReady) + pollAndHandleEvents(NO_TIMEOUT); + } + else + { + auto timeout_ms(std::chrono::duration_cast(timeout).count()); + auto pollTimeout_ms(timeout_ms / 10); + std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); + while(!isReady && (std::chrono::steady_clock::now() - start < std::chrono::milliseconds(timeout_ms))) + pollAndHandleEvents(pollTimeout_ms); + } } void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns) { - synced = false; + waitSdlToBeReady(ns, operationTimeout); +} + +void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns, const std::chrono::steady_clock::duration& timeout) +{ + isReady = false; asyncStorage->waitReadyAsync(ns, - std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, + std::bind(&shareddatalayer::SyncStorageImpl::waitReadyAck, this, - std::error_code())); - waitForCallback(); + std::placeholders::_1)); + waitForReadinessCheckCallback(timeout); +} + +void SyncStorageImpl::waitReady(const Namespace& ns, const std::chrono::steady_clock::duration& timeout) +{ + waitSdlToBeReady(ns, timeout); + if(!isReady) + throw RejectedBySdl("Timeout, SDL service not ready for the '" + ns + "' namespace"); verifyBackendResponse(); } void SyncStorageImpl::set(const Namespace& ns, const DataMap& dataMap) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; + asyncStorage->setAsync(ns, dataMap, std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, this, std::placeholders::_1)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); } bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& oldData, const Data& newData) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->setIfAsync(ns, @@ -150,13 +198,14 @@ bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& old this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localStatus; } bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const Data& data) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->setIfNotExistsAsync(ns, @@ -166,13 +215,14 @@ bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localStatus; } SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& keys) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->getAsync(ns, @@ -181,13 +231,14 @@ SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& k this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localMap; } void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->removeAsync(ns, @@ -195,12 +246,13 @@ void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys) std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, this, std::placeholders::_1)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); } bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& data) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->removeIfAsync(ns, @@ -210,13 +262,14 @@ bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localStatus; } SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::string& keyPrefix) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->findKeysAsync(ns, @@ -225,19 +278,52 @@ SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std:: this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); + verifyBackendResponse(); + return localKeys; +} + +SyncStorageImpl::Keys SyncStorageImpl::listKeys(const Namespace& ns, const std::string& pattern) +{ + handlePendingEvents(); + waitSdlToBeReady(ns); + synced = false; + asyncStorage->listKeys(ns, + pattern, + std::bind(&shareddatalayer::SyncStorageImpl::findKeysAck, + this, + std::placeholders::_1, + std::placeholders::_2)); + waitForOperationCallback(); verifyBackendResponse(); return localKeys; } void SyncStorageImpl::removeAll(const Namespace& ns) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->removeAllAsync(ns, std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, this, std::placeholders::_1)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); } + +void SyncStorageImpl::handlePendingEvents() +{ + int pollRetVal = system.poll(&events, 1, 0); + + while (pollRetVal > 0 && events.revents & POLLIN) + { + asyncStorage->handleEvents(); + pollRetVal = system.poll(&events, 1, 0); + } +} + +void SyncStorageImpl::setOperationTimeout(const std::chrono::steady_clock::duration& timeout) +{ + operationTimeout = timeout; +}