Add polling worker for watcher to probe 83/6983/1
authorBin Yang <bin.yang@windriver.com>
Wed, 3 Nov 2021 11:13:52 +0000 (19:13 +0800)
committerBin Yang <bin.yang@windriver.com>
Wed, 3 Nov 2021 14:54:51 +0000 (22:54 +0800)
Issue-ID: INF-196
Signed-off-by: Bin Yang <bin.yang@windriver.com>
Change-Id: I2dbe061b5183fe47d60ca7421d5844181e7f2824

o2ims/entrypoints/flask_application.py
o2ims/service/watcher/base.py
o2ims/service/watcher/executor.py [new file with mode: 0644]
o2ims/service/watcher/worker.py [new file with mode: 0644]
requirements.txt
tests/unit/test_watcher.py

index 8965a30..bb791bc 100644 (file)
@@ -19,10 +19,12 @@ from flask import Flask, jsonify
 # from o2ims.service.handlers import InvalidResourceType\r
 from o2ims import bootstrap, config\r
 from o2ims.views import ocloud_view\r
+from o2ims.service.watcher.executor import start_watchers\r
 \r
 app = Flask(__name__)\r
 bus = bootstrap.bootstrap()\r
 apibase = config.get_o2ims_api_base()\r
+start_watchers()\r
 \r
 \r
 @app.route(apibase, methods=["GET"])\r
index 4ea23ff..26c67da 100644 (file)
@@ -26,11 +26,17 @@ class BaseWatcher(object):
         super().__init__()\r
         self._client = client\r
 \r
+    def targetname(self) -> str:\r
+        return self._targetname()\r
+\r
     def probe(self):\r
         self._probe()\r
 \r
     def _probe(self):\r
-        pass\r
+        raise NotImplementedError\r
+\r
+    def _targetname(self):\r
+        raise NotImplementedError\r
 \r
 \r
 class OcloudWather(BaseWatcher):\r
@@ -39,6 +45,9 @@ class OcloudWather(BaseWatcher):
         super().__init__(ocloud_client)\r
         self._repo = repo\r
 \r
+    def _targetname(self):\r
+        return "ocloud"\r
+\r
     def _probe(self):\r
         ocloudmodel = self._client.get(None)\r
         if ocloudmodel:\r
@@ -58,11 +67,25 @@ class OcloudWather(BaseWatcher):
                 self._repo.update(localmodel)\r
 \r
 \r
-class ResourcePoolWatcher(object):\r
+class DmsWatcher(BaseWatcher):\r
+    def __init__(self, client: BaseClient) -> None:\r
+        super().__init__(client)\r
+\r
+    def _targetname(self):\r
+        return "dms"\r
+\r
+\r
+class ResourcePoolWatcher(BaseWatcher):\r
     def __init__(self) -> None:\r
         super().__init__()\r
 \r
+    def _targetname(self):\r
+        return "ocloud"\r
 \r
-class ResourceWatcher(object):\r
+\r
+class ResourceWatcher(BaseWatcher):\r
     def __init__(self) -> None:\r
         super().__init__()\r
+\r
+    def _targetname(self):\r
+        return "resource"\r
diff --git a/o2ims/service/watcher/executor.py b/o2ims/service/watcher/executor.py
new file mode 100644 (file)
index 0000000..607bdc9
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+import cotyledon\r
+\r
+from o2ims.service.watcher.worker import PollWorker\r
+from o2ims.service.watcher.base import OcloudWather\r
+from o2ims.service.watcher.base import DmsWatcher\r
+\r
+import logging\r
+logger = logging.getLogger(__name__)\r
+\r
+\r
+class WatcherService(cotyledon.Service):\r
+    def __init__(self, worker_id, args) -> None:\r
+        super().__init__(worker_id)\r
+        self.args = args\r
+        self.worker = PollWorker()\r
+\r
+    def run(self):\r
+        try:\r
+            self.worker.add_watcher(OcloudWather())\r
+            self.worker.add_watcher(DmsWatcher())\r
+            self.worker.start()\r
+        except Exception as ex:\r
+            logger.warning(ex.message)\r
+        finally:\r
+            self.worker.stop()\r
+\r
+\r
+def start_watchers(sm=None):\r
+    watchersm = sm if sm else cotyledon.ServiceManager()\r
+    watchersm.add(WatcherService, workers=1, args=())\r
+    return watchersm\r
diff --git a/o2ims/service/watcher/worker.py b/o2ims/service/watcher/worker.py
new file mode 100644 (file)
index 0000000..a348074
--- /dev/null
@@ -0,0 +1,64 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+import time\r
+import sched\r
+from o2ims.service.watcher.base import BaseWatcher\r
+\r
+import logging\r
+logger = logging.getLogger(__name__)\r
+\r
+\r
+class PollWorker(object):\r
+    def __init__(self, interval=10) -> None:\r
+        super().__init__()\r
+        self.watchers = {}\r
+        self.schedinstance = sched.scheduler(time.time, time.sleep)\r
+        self.schedinterval = interval\r
+        self._stopped = True\r
+\r
+    def set_interval(self, interval):\r
+        if interval > 0:\r
+            self.schedinterval = interval\r
+        else:\r
+            raise Exception("Invalid interval:" + interval)\r
+\r
+    def add_watcher(self, watcher: BaseWatcher):\r
+        self.watchers[watcher.targetname()] = watcher\r
+\r
+    def _repeat(self):\r
+        logger.debug("_repeat started")\r
+        if self._stopped:\r
+            return\r
+        for w in self.watchers.keys():\r
+            try:\r
+                logger.debug("about to probe:"+w)\r
+                self.watchers[w].probe()\r
+            except Exception as ex:\r
+                logger.warning(ex.message)\r
+                continue\r
+        self.schedinstance.enter(self.schedinterval, 1, self._repeat)\r
+\r
+    # note the sched run will block current thread\r
+    def start(self):\r
+        self._stopped = False\r
+        logger.debug('about to start sched task')\r
+        self.schedinstance.enter(self.schedinterval, 1, self._repeat)\r
+        self.schedinstance.run()\r
+\r
+    def stop(self):\r
+        self._stopped = True\r
+\r
+\r
+defaultworker = PollWorker()\r
index acbfe10..ca71bc2 100644 (file)
@@ -2,6 +2,7 @@ flask
 sqlalchemy\r
 redis\r
 psycopg2-binary\r
+cotyledon\r
 \r
 Cython>=3.0a1\r
 \r
index ec28519..4cdfb76 100644 (file)
@@ -12,6 +12,7 @@
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+import time\r
 from datetime import datetime\r
 import json\r
 from typing import List\r
@@ -20,9 +21,11 @@ import pytest
 from o2ims.domain import ocloud\r
 from o2ims import config\r
 import uuid\r
-from o2ims.service.watcher.base import OcloudWather\r
+from o2ims.service.watcher.base import BaseWatcher, OcloudWather\r
 from o2ims.domain import stx_object as ocloudModel\r
 from o2ims.adapter.ocloud_repository import OcloudRepository\r
+from o2ims.service.watcher import worker\r
+from o2ims.service.watcher.executor import start_watchers\r
 \r
 class FakeOcloudClient(BaseClient):\r
     def __init__(self):\r
@@ -69,3 +72,41 @@ def test_probe_new_ocloud():
     ocloudwatcher.probe()\r
     assert len(fakeRepo.oclouds) == 1\r
     assert fakeRepo.oclouds[0].name == "stx1"\r
+\r
+def test_default_worker():\r
+\r
+    class FakeOCloudWatcher(BaseWatcher):\r
+        def __init__(self, client: BaseClient,\r
+                     repo: OcloudRepository) -> None:\r
+            super().__init__(client)\r
+            self.fakeOcloudWatcherCounter = 0\r
+            self._client = client\r
+            self._repo = repo\r
+\r
+        def _targetname(self):\r
+            return "fakeocloudwatcher"\r
+        \r
+        def _probe(self):\r
+            self.fakeOcloudWatcherCounter += 1\r
+            # hacking to stop the blocking sched task\r
+            if self.fakeOcloudWatcherCounter > 2:\r
+                worker.defaultworker.stop()\r
+\r
+\r
+    fakeRepo = FakeOcloudRepo()\r
+    fakeClient = FakeOcloudClient()\r
+    fakewatcher = FakeOCloudWatcher(fakeClient, fakeRepo)\r
+\r
+    worker.defaultworker.set_interval(1)\r
+    worker.defaultworker.add_watcher(fakewatcher)\r
+    assert fakewatcher.fakeOcloudWatcherCounter == 0\r
+\r
+    count1 = fakewatcher.fakeOcloudWatcherCounter\r
+    worker.defaultworker.start()\r
+    time.sleep(20)\r
+    assert fakewatcher.fakeOcloudWatcherCounter > count1\r
+\r
+    # assumed hacking: probe has stopped the sched task\r
+    count3 = fakewatcher.fakeOcloudWatcherCounter\r
+    time.sleep(3)\r
+    assert fakewatcher.fakeOcloudWatcherCounter == count3\r