1 # pylint: disable=redefined-outer-name
\r
5 from pathlib import Path
\r
10 from sqlalchemy import create_engine
\r
11 from sqlalchemy.orm import sessionmaker, clear_mappers
\r
12 from tenacity import retry, stop_after_delay
\r
14 from o2ims.adapter.orm import metadata, start_o2ims_mappers
\r
15 from o2ims.adapter.clients.orm_stx import start_o2ims_stx_mappers
\r
16 from o2ims import config
\r
17 from o2ims.domain import stx_object as ocloudModel
\r
21 def in_memory_sqlite_db():
\r
22 engine = create_engine("sqlite:///:memory:")
\r
23 # engine = create_engine("sqlite:///:memory:", echo=True)
\r
24 metadata.create_all(engine)
\r
29 def sqlite_session_factory(in_memory_sqlite_db):
\r
30 yield sessionmaker(bind=in_memory_sqlite_db)
\r
35 start_o2ims_mappers()
\r
36 start_o2ims_stx_mappers()
\r
41 @retry(stop=stop_after_delay(10))
\r
42 def wait_for_postgres_to_come_up(engine):
\r
43 return engine.connect()
\r
46 @retry(stop=stop_after_delay(10))
\r
47 def wait_for_webapp_to_come_up():
\r
48 return requests.get(config.get_api_url())
\r
51 @retry(stop=stop_after_delay(10))
\r
52 def wait_for_redis_to_come_up():
\r
53 r = redis.Redis(**config.get_redis_host_and_port())
\r
57 @pytest.fixture(scope="session")
\r
59 engine = create_engine(config.get_postgres_uri(), isolation_level="SERIALIZABLE")
\r
60 wait_for_postgres_to_come_up(engine)
\r
61 metadata.create_all(engine)
\r
66 def postgres_session_factory(postgres_db):
\r
67 yield sessionmaker(bind=postgres_db)
\r
71 def postgres_session(postgres_session_factory):
\r
72 return postgres_session_factory()
\r
77 (Path(__file__).parent / "../src/o2ims/entrypoints/flask_application.py").touch()
\r
79 wait_for_webapp_to_come_up()
\r
83 def restart_redis_pubsub():
\r
84 wait_for_redis_to_come_up()
\r
85 if not shutil.which("docker-compose"):
\r
86 print("skipping restart, assumes running in container")
\r
89 ["docker-compose", "restart", "-t", "0", "redis_pubsub"],
\r