Add definable timeout for SyncStorage APIs
[ric-plt/sdl.git] / src / syncstorageimpl.cpp
index 4ddc723..76c1546 100644 (file)
@@ -19,8 +19,8 @@
  * platform project (RICP).
 */
 
+#include <chrono>
 #include <sstream>
-#include <sys/poll.h>
 #include <sdl/asyncstorage.hpp>
 #include <sdl/backenderror.hpp>
 #include <sdl/errorqueries.hpp>
@@ -58,6 +58,10 @@ namespace
     }
 }
 
+/* TODO: This synchronous API implementation could probably be refactored to be boost::asio based
+ * instead of current (bit error prone) poll based implementation.
+ */
+
 SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> asyncStorage):
     SyncStorageImpl(std::move(asyncStorage), System::getSystem())
 {
@@ -67,10 +71,17 @@ SyncStorageImpl::SyncStorageImpl(std::unique_ptr<AsyncStorage> pAsyncStorage,
                                  System& system):
     asyncStorage(std::move(pAsyncStorage)),
     system(system),
-    pFd(asyncStorage->fd()),
     localStatus(false),
-    synced(false)
+    synced(false),
+    isReady(false),
+    events{ asyncStorage->fd(), POLLIN, 0 },
+    operationTimeout(std::chrono::steady_clock::duration::zero())
+{
+}
+
+void SyncStorageImpl::waitReadyAck(const std::error_code&)
 {
+    isReady = true;
 }
 
 void SyncStorageImpl::modifyAck(const std::error_code& error)
@@ -106,40 +117,62 @@ void SyncStorageImpl::verifyBackendResponse()
         throwExceptionForErrorCode(localError);
 }
 
-void SyncStorageImpl::waitForCallback()
+void SyncStorageImpl::waitForOperationCallback()
 {
-    struct pollfd events { pFd, POLLIN, 0 };
     while(!synced)
-        if (system.poll(&events, 1, -1) > 0 && (events.revents & POLLIN))
-            asyncStorage->handleEvents();
+        pollAndHandleEvents(NO_TIMEOUT);
+}
+
+void SyncStorageImpl::pollAndHandleEvents(int timeout_ms)
+{
+    if (system.poll(&events, 1, timeout_ms) > 0 && (events.revents & POLLIN))
+        asyncStorage->handleEvents();
+}
+
+void SyncStorageImpl::waitForReadinessCheckCallback()
+{
+    if (operationTimeout == std::chrono::steady_clock::duration::zero())
+    {
+        while (!isReady)
+            pollAndHandleEvents(NO_TIMEOUT);
+    }
+    else
+    {
+        int pollTimeout_ms = std::chrono::duration_cast<std::chrono::milliseconds>(operationTimeout).count() / 10;
+        std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
+        while(!isReady && (std::chrono::steady_clock::now() - start < operationTimeout))
+            pollAndHandleEvents(pollTimeout_ms);
+    }
 }
 
 void SyncStorageImpl::waitSdlToBeReady(const Namespace& ns)
 {
-    synced = false;
+    isReady = false;
     asyncStorage->waitReadyAsync(ns,
-                                 std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
+                                 std::bind(&shareddatalayer::SyncStorageImpl::waitReadyAck,
                                            this,
                                            std::error_code()));
-    waitForCallback();
-    verifyBackendResponse();
+    waitForReadinessCheckCallback();
 }
 
 void SyncStorageImpl::set(const Namespace& ns, const DataMap& dataMap)
 {
+    handlePendingEvents();
     waitSdlToBeReady(ns);
     synced = false;
+
     asyncStorage->setAsync(ns,
                            dataMap,
                            std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
                                      this,
                                      std::placeholders::_1));
-    waitForCallback();
+    waitForOperationCallback();
     verifyBackendResponse();
 }
 
 bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& oldData, const Data& newData)
 {
+    handlePendingEvents();
     waitSdlToBeReady(ns);
     synced = false;
     asyncStorage->setIfAsync(ns,
@@ -150,13 +183,14 @@ bool SyncStorageImpl::setIf(const Namespace& ns, const Key& key, const Data& old
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2));
-    waitForCallback();
+    waitForOperationCallback();
     verifyBackendResponse();
     return localStatus;
 }
 
 bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const Data& data)
 {
+    handlePendingEvents();
     waitSdlToBeReady(ns);
     synced = false;
     asyncStorage->setIfNotExistsAsync(ns,
@@ -166,13 +200,14 @@ bool SyncStorageImpl::setIfNotExists(const Namespace& ns, const Key& key, const
                                                 this,
                                                 std::placeholders::_1,
                                                 std::placeholders::_2));
-    waitForCallback();
+    waitForOperationCallback();
     verifyBackendResponse();
     return localStatus;
 }
 
 SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& keys)
 {
+    handlePendingEvents();
     waitSdlToBeReady(ns);
     synced = false;
     asyncStorage->getAsync(ns,
@@ -181,13 +216,14 @@ SyncStorageImpl::DataMap SyncStorageImpl::get(const Namespace& ns, const Keys& k
                                      this,
                                      std::placeholders::_1,
                                      std::placeholders::_2));
-    waitForCallback();
+    waitForOperationCallback();
     verifyBackendResponse();
     return localMap;
 }
 
 void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys)
 {
+    handlePendingEvents();
     waitSdlToBeReady(ns);
     synced = false;
     asyncStorage->removeAsync(ns,
@@ -195,12 +231,13 @@ void SyncStorageImpl::remove(const Namespace& ns, const Keys& keys)
                               std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
                                         this,
                                         std::placeholders::_1));
-    waitForCallback();
+    waitForOperationCallback();
     verifyBackendResponse();
 }
 
 bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data& data)
 {
+    handlePendingEvents();
     waitSdlToBeReady(ns);
     synced = false;
     asyncStorage->removeIfAsync(ns,
@@ -210,13 +247,14 @@ bool SyncStorageImpl::removeIf(const Namespace& ns, const Key& key, const Data&
                                           this,
                                           std::placeholders::_1,
                                           std::placeholders::_2));
-    waitForCallback();
+    waitForOperationCallback();
     verifyBackendResponse();
     return localStatus;
 }
 
 SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::string& keyPrefix)
 {
+    handlePendingEvents();
     waitSdlToBeReady(ns);
     synced = false;
     asyncStorage->findKeysAsync(ns,
@@ -225,19 +263,36 @@ SyncStorageImpl::Keys SyncStorageImpl::findKeys(const Namespace& ns, const std::
                                           this,
                                           std::placeholders::_1,
                                           std::placeholders::_2));
-    waitForCallback();
+    waitForOperationCallback();
     verifyBackendResponse();
     return localKeys;
 }
 
 void SyncStorageImpl::removeAll(const Namespace& ns)
 {
+    handlePendingEvents();
     waitSdlToBeReady(ns);
     synced = false;
     asyncStorage->removeAllAsync(ns,
                                  std::bind(&shareddatalayer::SyncStorageImpl::modifyAck,
                                            this,
                                            std::placeholders::_1));
-    waitForCallback();
+    waitForOperationCallback();
     verifyBackendResponse();
 }
+
+void SyncStorageImpl::handlePendingEvents()
+{
+    int pollRetVal = system.poll(&events, 1, 0);
+
+    while (pollRetVal > 0 && events.revents & POLLIN)
+    {
+        asyncStorage->handleEvents();
+        pollRetVal = system.poll(&events, 1, 0);
+    }
+}
+
+void SyncStorageImpl::setOperationTimeout(const std::chrono::steady_clock::duration& timeout)
+{
+    operationTimeout = timeout;
+}