1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2022 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
23 from ricsdl.configuration import _Configuration
24 from ricsdl.configuration import DbBackendType
28 def config_fixture(request, monkeypatch):
29 monkeypatch.setenv('DBAAS_SERVICE_HOST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt')
30 monkeypatch.setenv('DBAAS_SERVICE_PORT', '10000,10001')
31 monkeypatch.setenv('DBAAS_SERVICE_SENTINEL_PORT', '11000,11001')
32 monkeypatch.setenv('DBAAS_MASTER_NAME', 'my-master-0,my-master-1')
33 monkeypatch.setenv('DBAAS_CLUSTER_ADDR_LIST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt')
34 request.cls.config = _Configuration(fake_db_backend=None)
37 def fake_db_config_fixture(request, monkeypatch):
38 monkeypatch.delenv('DBAAS_SERVICE_HOST', raising=False)
39 monkeypatch.delenv('DBAAS_SERVICE_PORT', raising=False)
40 monkeypatch.delenv('DBAAS_SERVICE_SENTINEL_PORT', raising=False)
41 monkeypatch.delenv('DBAAS_MASTER_NAME', raising=False)
42 monkeypatch.delenv('DBAAS_CLUSTER_ADDR_LIST', raising=False)
43 request.cls.config = _Configuration(fake_db_backend='dict')
45 class TestConfiguration:
46 def test_get_params_function_returns_read_configuration(self, config_fixture):
47 expected_config = _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
48 db_ports=['10000','10001'],
49 db_sentinel_ports=['11000','11001'],
50 db_sentinel_master_names=['my-master-0','my-master-1'],
51 db_cluster_addrs=['service-ricplt-dbaas-tcp-cluster-0.ricplt','service-ricplt-dbaas-tcp-cluster-1.ricplt'],
52 db_type=DbBackendType.REDIS)
53 assert expected_config == self.config.get_params()
55 def test_get_params_function_can_return_fake_db_configuration(self, fake_db_config_fixture):
56 expected_config = _Configuration.Params(db_host='', db_ports=[],
58 db_sentinel_master_names=[],
60 db_type=DbBackendType.FAKE_DICT)
61 assert expected_config == self.config.get_params()
63 def test_get_event_separator_function_return_expected_separator(self, config_fixture):
64 assert "___" == _Configuration.get_event_separator()
66 def test_get_params_function_can_raise_exception_if_wrong_fake_db_type(self):
67 with pytest.raises(ValueError, match=r"Configuration error"):
68 _Configuration(fake_db_backend='bad value')
71 def test_configuration_object_string_representation(self, config_fixture):
72 expected_config_info = {'DB host': 'service-ricplt-dbaas-tcp-cluster-0.ricplt',
73 'DB ports': ['10000','10001'],
74 'DB master sentinels': ['my-master-0','my-master-1'],
75 'DB sentinel ports': ['11000','11001'],
76 'DB cluster addresses': ['service-ricplt-dbaas-tcp-cluster-0.ricplt','service-ricplt-dbaas-tcp-cluster-1.ricplt'],
78 assert str(self.config) == str(expected_config_info)
80 def test_configuration_object_string_representation_if_fake_db(self, fake_db_config_fixture):
81 expected_config_info = {'DB host': '',
83 'DB master sentinels': [],
84 'DB sentinel ports': [],
85 'DB cluster addresses': [],
86 'DB type': 'FAKE_DICT'}
87 assert str(self.config) == str(expected_config_info)
89 def test_complete_configuration_if_less_ports_than_addresses(self, monkeypatch):
90 monkeypatch.setenv('DBAAS_SERVICE_HOST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt')
91 monkeypatch.setenv('DBAAS_SERVICE_PORT', '10000')
92 monkeypatch.setenv('DBAAS_CLUSTER_ADDR_LIST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt')
94 expected_config = _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
95 db_ports=['10000','10000'],
97 db_sentinel_master_names=[],
98 db_cluster_addrs=['service-ricplt-dbaas-tcp-cluster-0.ricplt','service-ricplt-dbaas-tcp-cluster-1.ricplt'],
99 db_type=DbBackendType.REDIS)
100 assert expected_config == _Configuration(fake_db_backend=None).get_params()
102 def test_complete_configuration_if_less_sentinel_ports_than_addresses(self, monkeypatch):
103 monkeypatch.setenv('DBAAS_SERVICE_HOST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt')
104 monkeypatch.setenv('DBAAS_SERVICE_PORT', '10000,10001')
105 monkeypatch.setenv('DBAAS_SERVICE_SENTINEL_PORT', '11000')
106 monkeypatch.setenv('DBAAS_MASTER_NAME', 'my-master-0,my-master-1')
107 monkeypatch.setenv('DBAAS_CLUSTER_ADDR_LIST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt')
109 expected_config = _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
110 db_ports=['10000','10001'],
111 db_sentinel_ports=['11000','11000'],
112 db_sentinel_master_names=['my-master-0','my-master-1'],
113 db_cluster_addrs=['service-ricplt-dbaas-tcp-cluster-0.ricplt','service-ricplt-dbaas-tcp-cluster-1.ricplt'],
114 db_type=DbBackendType.REDIS)
115 assert expected_config == _Configuration(fake_db_backend=None).get_params()
117 def test_complete_configuration_if_less_sentinel_names_than_addresses(self, monkeypatch):
118 monkeypatch.setenv('DBAAS_SERVICE_HOST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt')
119 monkeypatch.setenv('DBAAS_SERVICE_PORT', '10000,10001')
120 monkeypatch.setenv('DBAAS_SERVICE_SENTINEL_PORT', '11000,11001')
121 monkeypatch.setenv('DBAAS_MASTER_NAME', 'my-master-0')
122 monkeypatch.setenv('DBAAS_CLUSTER_ADDR_LIST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt')
124 expected_config = _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
125 db_ports=['10000','10001'],
126 db_sentinel_ports=['11000','11001'],
127 db_sentinel_master_names=['my-master-0','my-master-0'],
128 db_cluster_addrs=['service-ricplt-dbaas-tcp-cluster-0.ricplt','service-ricplt-dbaas-tcp-cluster-1.ricplt'],
129 db_type=DbBackendType.REDIS)
130 assert expected_config == _Configuration(fake_db_backend=None).get_params()