From: Bin Yang Date: Wed, 3 Nov 2021 11:13:52 +0000 (+0800) Subject: Add polling worker for watcher to probe X-Git-Tag: 1.0.0~36 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=bff814cb30e501eb1e54aecb8110a78d41e7bdb0;p=pti%2Fo2.git Add polling worker for watcher to probe Issue-ID: INF-196 Signed-off-by: Bin Yang Change-Id: I2dbe061b5183fe47d60ca7421d5844181e7f2824 --- diff --git a/o2ims/entrypoints/flask_application.py b/o2ims/entrypoints/flask_application.py index 8965a30..bb791bc 100644 --- a/o2ims/entrypoints/flask_application.py +++ b/o2ims/entrypoints/flask_application.py @@ -19,10 +19,12 @@ from flask import Flask, jsonify # from o2ims.service.handlers import InvalidResourceType from o2ims import bootstrap, config from o2ims.views import ocloud_view +from o2ims.service.watcher.executor import start_watchers app = Flask(__name__) bus = bootstrap.bootstrap() apibase = config.get_o2ims_api_base() +start_watchers() @app.route(apibase, methods=["GET"]) diff --git a/o2ims/service/watcher/base.py b/o2ims/service/watcher/base.py index 4ea23ff..26c67da 100644 --- a/o2ims/service/watcher/base.py +++ b/o2ims/service/watcher/base.py @@ -26,11 +26,17 @@ class BaseWatcher(object): super().__init__() self._client = client + def targetname(self) -> str: + return self._targetname() + def probe(self): self._probe() def _probe(self): - pass + raise NotImplementedError + + def _targetname(self): + raise NotImplementedError class OcloudWather(BaseWatcher): @@ -39,6 +45,9 @@ class OcloudWather(BaseWatcher): super().__init__(ocloud_client) self._repo = repo + def _targetname(self): + return "ocloud" + def _probe(self): ocloudmodel = self._client.get(None) if ocloudmodel: @@ -58,11 +67,25 @@ class OcloudWather(BaseWatcher): self._repo.update(localmodel) -class ResourcePoolWatcher(object): +class DmsWatcher(BaseWatcher): + def __init__(self, client: BaseClient) -> None: + super().__init__(client) + + def _targetname(self): + return "dms" + + +class ResourcePoolWatcher(BaseWatcher): def __init__(self) -> None: super().__init__() + def _targetname(self): + return "ocloud" -class ResourceWatcher(object): + +class ResourceWatcher(BaseWatcher): def __init__(self) -> None: super().__init__() + + def _targetname(self): + return "resource" diff --git a/o2ims/service/watcher/executor.py b/o2ims/service/watcher/executor.py new file mode 100644 index 0000000..607bdc9 --- /dev/null +++ b/o2ims/service/watcher/executor.py @@ -0,0 +1,45 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cotyledon + +from o2ims.service.watcher.worker import PollWorker +from o2ims.service.watcher.base import OcloudWather +from o2ims.service.watcher.base import DmsWatcher + +import logging +logger = logging.getLogger(__name__) + + +class WatcherService(cotyledon.Service): + def __init__(self, worker_id, args) -> None: + super().__init__(worker_id) + self.args = args + self.worker = PollWorker() + + def run(self): + try: + self.worker.add_watcher(OcloudWather()) + self.worker.add_watcher(DmsWatcher()) + self.worker.start() + except Exception as ex: + logger.warning(ex.message) + finally: + self.worker.stop() + + +def start_watchers(sm=None): + watchersm = sm if sm else cotyledon.ServiceManager() + watchersm.add(WatcherService, workers=1, args=()) + return watchersm diff --git a/o2ims/service/watcher/worker.py b/o2ims/service/watcher/worker.py new file mode 100644 index 0000000..a348074 --- /dev/null +++ b/o2ims/service/watcher/worker.py @@ -0,0 +1,64 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import sched +from o2ims.service.watcher.base import BaseWatcher + +import logging +logger = logging.getLogger(__name__) + + +class PollWorker(object): + def __init__(self, interval=10) -> None: + super().__init__() + self.watchers = {} + self.schedinstance = sched.scheduler(time.time, time.sleep) + self.schedinterval = interval + self._stopped = True + + def set_interval(self, interval): + if interval > 0: + self.schedinterval = interval + else: + raise Exception("Invalid interval:" + interval) + + def add_watcher(self, watcher: BaseWatcher): + self.watchers[watcher.targetname()] = watcher + + def _repeat(self): + logger.debug("_repeat started") + if self._stopped: + return + for w in self.watchers.keys(): + try: + logger.debug("about to probe:"+w) + self.watchers[w].probe() + except Exception as ex: + logger.warning(ex.message) + continue + self.schedinstance.enter(self.schedinterval, 1, self._repeat) + + # note the sched run will block current thread + def start(self): + self._stopped = False + logger.debug('about to start sched task') + self.schedinstance.enter(self.schedinterval, 1, self._repeat) + self.schedinstance.run() + + def stop(self): + self._stopped = True + + +defaultworker = PollWorker() diff --git a/requirements.txt b/requirements.txt index acbfe10..ca71bc2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ flask sqlalchemy redis psycopg2-binary +cotyledon Cython>=3.0a1 diff --git a/tests/unit/test_watcher.py b/tests/unit/test_watcher.py index ec28519..4cdfb76 100644 --- a/tests/unit/test_watcher.py +++ b/tests/unit/test_watcher.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from datetime import datetime import json from typing import List @@ -20,9 +21,11 @@ import pytest from o2ims.domain import ocloud from o2ims import config import uuid -from o2ims.service.watcher.base import OcloudWather +from o2ims.service.watcher.base import BaseWatcher, OcloudWather from o2ims.domain import stx_object as ocloudModel from o2ims.adapter.ocloud_repository import OcloudRepository +from o2ims.service.watcher import worker +from o2ims.service.watcher.executor import start_watchers class FakeOcloudClient(BaseClient): def __init__(self): @@ -69,3 +72,41 @@ def test_probe_new_ocloud(): ocloudwatcher.probe() assert len(fakeRepo.oclouds) == 1 assert fakeRepo.oclouds[0].name == "stx1" + +def test_default_worker(): + + class FakeOCloudWatcher(BaseWatcher): + def __init__(self, client: BaseClient, + repo: OcloudRepository) -> None: + super().__init__(client) + self.fakeOcloudWatcherCounter = 0 + self._client = client + self._repo = repo + + def _targetname(self): + return "fakeocloudwatcher" + + def _probe(self): + self.fakeOcloudWatcherCounter += 1 + # hacking to stop the blocking sched task + if self.fakeOcloudWatcherCounter > 2: + worker.defaultworker.stop() + + + fakeRepo = FakeOcloudRepo() + fakeClient = FakeOcloudClient() + fakewatcher = FakeOCloudWatcher(fakeClient, fakeRepo) + + worker.defaultworker.set_interval(1) + worker.defaultworker.add_watcher(fakewatcher) + assert fakewatcher.fakeOcloudWatcherCounter == 0 + + count1 = fakewatcher.fakeOcloudWatcherCounter + worker.defaultworker.start() + time.sleep(20) + assert fakewatcher.fakeOcloudWatcherCounter > count1 + + # assumed hacking: probe has stopped the sched task + count3 = fakewatcher.fakeOcloudWatcherCounter + time.sleep(3) + assert fakewatcher.fakeOcloudWatcherCounter == count3