Add ocloud watcher and tests
[pti/o2.git] / tests / conftest.py
1 # pylint: disable=redefined-outer-name\r
2 import shutil\r
3 import subprocess\r
4 import time\r
5 from pathlib import Path\r
6 \r
7 import pytest\r
8 import redis\r
9 import requests\r
10 from sqlalchemy import create_engine\r
11 from sqlalchemy.orm import sessionmaker, clear_mappers\r
12 from tenacity import retry, stop_after_delay\r
13 \r
14 from o2ims.adapter.orm import metadata, start_o2ims_mappers\r
15 from o2ims.adapter.clients.orm_stx import start_o2ims_stx_mappers\r
16 from o2ims import config\r
17 from o2ims.domain import stx_object as ocloudModel\r
18 \r
19 \r
20 @pytest.fixture\r
21 def in_memory_sqlite_db():\r
22     engine = create_engine("sqlite:///:memory:")\r
23     # engine = create_engine("sqlite:///:memory:", echo=True)\r
24     metadata.create_all(engine)\r
25     return engine\r
26 \r
27 \r
28 @pytest.fixture\r
29 def sqlite_session_factory(in_memory_sqlite_db):\r
30     yield sessionmaker(bind=in_memory_sqlite_db)\r
31 \r
32 \r
33 @pytest.fixture\r
34 def mappers():\r
35     start_o2ims_mappers()\r
36     start_o2ims_stx_mappers()\r
37     yield\r
38     clear_mappers()\r
39 \r
40 \r
41 @retry(stop=stop_after_delay(10))\r
42 def wait_for_postgres_to_come_up(engine):\r
43     return engine.connect()\r
44 \r
45 \r
46 @retry(stop=stop_after_delay(10))\r
47 def wait_for_webapp_to_come_up():\r
48     return requests.get(config.get_api_url())\r
49 \r
50 \r
51 @retry(stop=stop_after_delay(10))\r
52 def wait_for_redis_to_come_up():\r
53     r = redis.Redis(**config.get_redis_host_and_port())\r
54     return r.ping()\r
55 \r
56 \r
57 @pytest.fixture(scope="session")\r
58 def postgres_db():\r
59     engine = create_engine(config.get_postgres_uri(), isolation_level="SERIALIZABLE")\r
60     wait_for_postgres_to_come_up(engine)\r
61     metadata.create_all(engine)\r
62     return engine\r
63 \r
64 \r
65 @pytest.fixture\r
66 def postgres_session_factory(postgres_db):\r
67     yield sessionmaker(bind=postgres_db)\r
68 \r
69 \r
70 @pytest.fixture\r
71 def postgres_session(postgres_session_factory):\r
72     return postgres_session_factory()\r
73 \r
74 \r
75 @pytest.fixture\r
76 def restart_api():\r
77     (Path(__file__).parent / "../src/o2ims/entrypoints/flask_application.py").touch()\r
78     time.sleep(0.5)\r
79     wait_for_webapp_to_come_up()\r
80 \r
81 \r
82 @pytest.fixture\r
83 def restart_redis_pubsub():\r
84     wait_for_redis_to_come_up()\r
85     if not shutil.which("docker-compose"):\r
86         print("skipping restart, assumes running in container")\r
87         return\r
88     subprocess.run(\r
89         ["docker-compose", "restart", "-t", "0", "redis_pubsub"],\r
90         check=True,\r
91     )\r