X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fsyncstorageimpl.cpp;h=3eaeb03514c6f58d80b52c97b3eb640c1742e7df;hb=d565df6613d97c08664a00f3fbc7cb5fdcf233f6;hp=b73029cc8e08923d5c20d56e695f9f6ac1582001;hpb=ef2bf51d04aaf01fa0cabdcaf905b23423067662;p=ric-plt%2Fsdl.git diff --git a/src/syncstorageimpl.cpp b/src/syncstorageimpl.cpp index b73029c..3eaeb03 100644 --- a/src/syncstorageimpl.cpp +++ b/src/syncstorageimpl.cpp @@ -14,8 +14,13 @@ limitations under the License. */ +/* + * This source code is part of the near-RT RIC (RAN Intelligent Controller) + * platform project (RICP). +*/ + +#include #include -#include #include #include #include @@ -53,6 +58,10 @@ namespace } } +/* TODO: This synchronous API implementation could probably be refactored to be boost::asio based + * instead of current (bit error prone) poll based implementation. + */ + SyncStorageImpl::SyncStorageImpl(std::unique_ptr asyncStorage): SyncStorageImpl(std::move(asyncStorage), System::getSystem()) { @@ -62,10 +71,18 @@ SyncStorageImpl::SyncStorageImpl(std::unique_ptr pAsyncStorage, System& system): asyncStorage(std::move(pAsyncStorage)), system(system), - pFd(asyncStorage->fd()), localStatus(false), - synced(false) + synced(false), + isReady(false), + events{ asyncStorage->fd(), POLLIN, 0 }, + operationTimeout(std::chrono::steady_clock::duration::zero()) +{ +} + +void SyncStorageImpl::waitReadyAck(const std::error_code& error) { + isReady = true; + localError = error; } void SyncStorageImpl::modifyAck(const std::error_code& error) @@ -101,40 +118,76 @@ void SyncStorageImpl::verifyBackendResponse() throwExceptionForErrorCode(localError); } -void SyncStorageImpl::waitForCallback() +void SyncStorageImpl::waitForOperationCallback() { - struct pollfd events { pFd, POLLIN, 0 }; while(!synced) - if (system.poll(&events, 1, -1) > 0 && (events.revents & POLLIN)) - asyncStorage->handleEvents(); + pollAndHandleEvents(NO_TIMEOUT); +} + +void SyncStorageImpl::pollAndHandleEvents(int timeout_ms) +{ + if (system.poll(&events, 1, timeout_ms) > 0 && (events.revents & POLLIN)) + asyncStorage->handleEvents(); +} + +void SyncStorageImpl::waitForReadinessCheckCallback(const std::chrono::steady_clock::duration& timeout) +{ + if (timeout == std::chrono::steady_clock::duration::zero()) + { + while (!isReady) + pollAndHandleEvents(NO_TIMEOUT); + } + else + { + auto timeout_ms(std::chrono::duration_cast(timeout).count()); + auto pollTimeout_ms(timeout_ms / 10); + std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); + while(!isReady && (std::chrono::steady_clock::now() - start < std::chrono::milliseconds(timeout_ms))) + pollAndHandleEvents(pollTimeout_ms); + } } void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns) { - synced = false; + waitSdlToBeReady(ns, operationTimeout); +} + +void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns, const std::chrono::steady_clock::duration& timeout) +{ + isReady = false; asyncStorage->waitReadyAsync(ns, - std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, + std::bind(&shareddatalayer::SyncStorageImpl::waitReadyAck, this, - std::error_code())); - waitForCallback(); + std::placeholders::_1)); + waitForReadinessCheckCallback(timeout); +} + +void SyncStorageImpl::waitReady(const Namespace& ns, const std::chrono::steady_clock::duration& timeout) +{ + waitSdlToBeReady(ns, timeout); + if(!isReady) + throw RejectedBySdl("Timeout, SDL service not ready for the '" + ns + "' namespace"); verifyBackendResponse(); } void SyncStorageImpl::set(const Namespace& ns, const DataMap& dataMap) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; + asyncStorage->setAsync(ns, dataMap, std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, this, std::placeholders::_1)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); } bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& oldData, const Data& newData) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->setIfAsync(ns, @@ -145,13 +198,14 @@ bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& old this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localStatus; } bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const Data& data) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->setIfNotExistsAsync(ns, @@ -161,13 +215,14 @@ bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localStatus; } SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& keys) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->getAsync(ns, @@ -176,13 +231,14 @@ SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& k this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localMap; } void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->removeAsync(ns, @@ -190,12 +246,13 @@ void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys) std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, this, std::placeholders::_1)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); } bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& data) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->removeIfAsync(ns, @@ -205,13 +262,14 @@ bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localStatus; } SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::string& keyPrefix) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->findKeysAsync(ns, @@ -220,19 +278,36 @@ SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std:: this, std::placeholders::_1, std::placeholders::_2)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); return localKeys; } void SyncStorageImpl::removeAll(const Namespace& ns) { + handlePendingEvents(); waitSdlToBeReady(ns); synced = false; asyncStorage->removeAllAsync(ns, std::bind(&shareddatalayer::SyncStorageImpl::modifyAck, this, std::placeholders::_1)); - waitForCallback(); + waitForOperationCallback(); verifyBackendResponse(); } + +void SyncStorageImpl::handlePendingEvents() +{ + int pollRetVal = system.poll(&events, 1, 0); + + while (pollRetVal > 0 && events.revents & POLLIN) + { + asyncStorage->handleEvents(); + pollRetVal = system.poll(&events, 1, 0); + } +} + +void SyncStorageImpl::setOperationTimeout(const std::chrono::steady_clock::duration& timeout) +{ + operationTimeout = timeout; +}