Update O2app start with global ocloud ID
[pti/o2.git] / o2common / service / messagebus.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # pylint: disable=broad-except, attribute-defined-outside-init
16 from __future__ import annotations
17 from typing import Callable, Dict, List, Union, Type, TYPE_CHECKING
18 from o2common.domain import commands, events
19
20 if TYPE_CHECKING:
21     from . import unit_of_work
22
23 from o2common.helper import o2logging
24 logger = o2logging.get_logger(__name__)
25
26 Message = Union[commands.Command, events.Event]
27
28
29 class MessageBus:
30     __instance: MessageBus = None
31
32     @staticmethod
33     def set_instance(instance: MessageBus):
34         MessageBus.__instance = instance
35
36     @staticmethod
37     def get_instance() -> MessageBus:
38         return MessageBus.__instance
39
40     def __init__(
41         self,
42         uow: unit_of_work.AbstractUnitOfWork,
43         event_handlers: Dict[Type[events.Event], List[Callable]],
44         command_handlers: Dict[Type[commands.Command], Callable],
45     ):
46         self.uow = uow
47         self.event_handlers = event_handlers
48         self.command_handlers = command_handlers
49
50     def handle(self, message: Message):
51         self.queue = [message]
52         while self.queue:
53             message = self.queue.pop(0)
54             if not message:
55                 continue
56             elif isinstance(message, events.Event):
57                 self.handle_event(message)
58             elif isinstance(message, commands.Command):
59                 self.handle_command(message)
60             else:
61                 raise Exception(f"{message} was not an Event or Command")
62
63     def handle_event(self, event: events.Event):
64         for handler in self.event_handlers[type(event)]:
65             try:
66                 logger.debug("handling event %s with handler %s",
67                              event, handler)
68                 handler(event)
69                 self.queue.extend(self.uow.collect_new_events())
70             except Exception:
71                 logger.exception("Exception handling event %s", event)
72                 continue
73
74     def handle_command(self, command: commands.Command):
75         logger.debug("handling command %s", command)
76         try:
77             handler = self.command_handlers[type(command)]
78             handler(command)
79             self.queue.extend(self.uow.collect_new_events())
80         except Exception as ex:
81             logger.exception("Exception handling command %s", command)
82             raise ex