1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from retry import retry
17 from typing import Callable
19 from o2common.adapter.notifications import AbstractNotifications,\
21 from o2common.adapter import redis_eventpublisher
22 from o2common.service import unit_of_work
23 from o2common.service import messagebus
25 from o2app.service import handlers
26 from o2app.adapter.unit_of_work import SqlAlchemyUnitOfWork
28 from o2ims.adapter import orm as o2ims_orm
29 from o2dms.adapter import orm as o2dms_orm
31 from o2common.helper import o2logging
32 logger = o2logging.get_logger(__name__)
35 @retry(tries=100, delay=2, backoff=1)
36 def wait_for_db_ready(engine):
38 logger.info("Wait for DB ready ...")
40 logger.info("DB is ready")
43 @retry(tries=3, delay=2)
44 def wait_for_mappers_ready(engine):
45 # wait for mapper ready
46 logger.info("Wait for mapper ready ...")
47 o2ims_orm.start_o2ims_mappers(engine)
48 o2dms_orm.start_o2dms_mappers(engine)
49 logger.info("mapper is ready")
53 start_orm: bool = True,
54 uow: unit_of_work.AbstractUnitOfWork = SqlAlchemyUnitOfWork(),
55 notifications: AbstractNotifications = None,
56 publish: Callable = redis_eventpublisher.publish,
57 ) -> messagebus.MessageBus:
59 if notifications is None:
60 notifications = SmoO2Notifications()
64 # get default engine if uow is by default
65 engine = uow.session.get_bind()
67 wait_for_db_ready(engine)
68 wait_for_mappers_ready(engine)
70 dependencies = {"uow": uow, "notifications": notifications,
72 injected_event_handlers = {
74 inject_dependencies(handler, dependencies)
75 for handler in event_handlers
77 for event_type, event_handlers in handlers.EVENT_HANDLERS.items()
79 injected_command_handlers = {
80 command_type: inject_dependencies(handler, dependencies)
81 for command_type, handler in handlers.COMMAND_HANDLERS.items()
84 bus = messagebus.MessageBus(
86 event_handlers=injected_event_handlers,
87 command_handlers=injected_command_handlers,
89 messagebus.MessageBus.set_instance(bus)
93 def inject_dependencies(handler, dependencies):
94 params = inspect.signature(handler).parameters
97 for name, dependency in dependencies.items()
100 return lambda message: handler(message, **deps)