INF-303 Add Infrastructure Monitoring Fault Service; INF-305 update inventory api...
[pti/o2.git] / o2common / service / watcher / worker.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import time
16 import sched
17 # from o2common.service.unit_of_work import AbstractUnitOfWork
18 from o2common.service.watcher.base import WatcherTree
19
20 from o2common.helper import o2logging
21 logger = o2logging.get_logger(__name__)
22
23
24 class PollWorker(object):
25     def __init__(self, interval=10, bus=None) -> None:
26         super().__init__()
27         self.watchers = []
28         self.schedinstance = sched.scheduler(time.time, time.sleep)
29         self.schedinterval = interval
30         self._stopped = True
31         self._bus = bus
32
33     def set_interval(self, interval):
34         if interval > 0:
35             self.schedinterval = interval
36         else:
37             raise Exception("Invalid interval:" + interval)
38
39     def add_watcher(self, watcher: WatcherTree):
40         self.watchers.append(watcher)
41
42     def _repeat(self):
43         logger.debug("_repeat started")
44         if self._stopped:
45             return
46         for w in self.watchers:
47             try:
48                 # logger.debug("about to probe:"+w)
49                 w.probe(None)
50             except Exception as ex:
51                 logger.warning("Worker raises exception %s: %s -  %s "
52                                % (w, type(ex), str(ex)))
53                 continue
54
55         # handle events
56         if self._bus is not None:
57             events = self._bus.uow.collect_new_events()
58             for event in events:
59                 self._bus.handle(event)
60
61         self.schedinstance.enter(self.schedinterval, 1, self._repeat)
62
63     # note the sched run will block current thread
64     def start(self):
65         self._stopped = False
66         logger.debug('about to start sched task')
67         self.schedinstance.enter(self.schedinterval, 1, self._repeat)
68         self.schedinstance.run()
69
70     def stop(self):
71         self._stopped = True