Version history
---------------
+[2.0.1] - 2020-01-08
+
+* Add a fake database backend implementation to be used only for testing
+ purposes when the real DBAAS database service is not available.
+
[2.0.0] - 2020-01-03
* Change find_keys() and find_and_get() API functions to support glob-style
# Creates SDL instance. The call creates connection to the SDL database backend.
mysdl = _try_func_return(SyncStorage)
+# Creates SDL instance what utilizes a fake database backend. Fake database is meant to
+# be used only at development phase of SDL clients. It does not provide more advanced
+# database services.
+# mysdl = _try_func_return(lambda: SyncStorage(fake_db_backend='dict'))
+
# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
# type must be bytes and multiple key values can be set in one set function call.
)
-__version__ = '2.0.0'
+__version__ = '2.0.1'
__all__ = [
"""Shared Data Layer (SDL) database backend module."""
from importlib import import_module
+from ricsdl.configuration import DbBackendType
def get_backend_instance(configuration):
"""
Select database backend solution and return and instance of it.
- For now only Redis backend solution is supported.
"""
- backend_name = 'RedisBackend'
- backend_module_name = 'redis'
+ if configuration.get_params().db_type == DbBackendType.FAKE_DICT:
+ backend_name = 'FakeDictBackend'
+ backend_module_name = 'fake_dict_db'
+ else:
+ backend_name = 'RedisBackend'
+ backend_module_name = 'redis'
package = __package__ or __name__
backend_module = import_module('.' + backend_module_name, package=package)
return instance
-def get_backend_lock_instance(ns, name, expiration, backend):
+def get_backend_lock_instance(configuration, ns, name, expiration, backend):
"""
Select database backend lock solution and return and instance of it.
- For now only Redis backend lock solution is supported.
"""
- backend_lock_name = 'RedisBackendLock'
- backend_module_name = 'redis'
+ if configuration.get_params().db_type == DbBackendType.FAKE_DICT:
+ backend_lock_name = 'FakeDictBackendLock'
+ backend_module_name = 'fake_dict_db'
+ else:
+ backend_lock_name = 'RedisBackendLock'
+ backend_module_name = 'redis'
package = __package__ or __name__
backend_module = import_module('.' + backend_module_name, package=package)
--- /dev/null
+# Copyright (c) 2019 AT&T Intellectual Property.
+# Copyright (c) 2018-2019 Nokia.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#
+
+
+"""The module provides fake implementation of Shared Data Layer (SDL) database backend interface."""
+import fnmatch
+from typing import (Dict, Set, List, Union)
+from ricsdl.configuration import _Configuration
+from .dbbackend_abc import DbBackendAbc
+from .dbbackend_abc import DbBackendLockAbc
+
+
+class FakeDictBackend(DbBackendAbc):
+ """
+ A class providing fake implementation of database backend of Shared Data Layer (SDL).
+ This class does not provide working database solution, this class can be used in testing
+ purposes only. Implementation does not provide shared database resource, SDL client sees
+ only its local local 'fake' database, which is a simple Python dictionary. Also keys are
+ stored in database under the same namespace.
+
+ Args:
+ configuration (_Configuration): SDL configuration, containing credentials to connect to
+ Redis database backend.
+ """
+ def __init__(self, configuration: _Configuration) -> None:
+ super().__init__()
+ self._db = {}
+ self._configuration = configuration
+
+ def __str__(self):
+ return str(
+ {
+ "DB type": "FAKE DB",
+ }
+ )
+
+ def close(self):
+ pass
+
+ def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
+ self._db = data_map.copy()
+
+ def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
+ if key not in self._db:
+ return False
+ db_data = self._db[key]
+ if db_data == old_data:
+ self._db[key] = new_data
+ return True
+ return False
+
+ def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
+ if key not in self._db:
+ self._db[key] = data
+ return True
+ return False
+
+ def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
+ ret = {}
+ for k in keys:
+ if k in self._db:
+ ret[k] = self._db[k]
+ return ret
+
+ def find_keys(self, ns: str, key_pattern: str) -> List[str]:
+ ret = []
+ for k in self._db:
+ if fnmatch.fnmatch(k, key_pattern):
+ ret.append(k)
+ return ret
+
+ def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
+ ret = {}
+ for key, val in self._db.items():
+ if fnmatch.fnmatch(key, key_pattern):
+ ret[key] = val
+ return ret
+
+ def remove(self, ns: str, keys: List[str]) -> None:
+ for key in keys:
+ self._db.pop(key, None)
+
+ def remove_if(self, ns: str, key: str, data: bytes) -> bool:
+ if key in self._db:
+ db_data = self._db[key]
+ if db_data == data:
+ self._db.pop(key)
+ return True
+ return False
+
+ def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
+ if group in self._db:
+ self._db[group] = self._db[group] | members.copy()
+ else:
+ self._db[group] = members.copy()
+
+ def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
+ if group not in self._db:
+ return
+ for member in members:
+ self._db[group].discard(member)
+
+ def remove_group(self, ns: str, group: str) -> None:
+ self._db.pop(group, None)
+
+ def get_members(self, ns: str, group: str) -> Set[bytes]:
+ return self._db.get(group, set())
+
+ def is_member(self, ns: str, group: str, member: bytes) -> bool:
+ if group not in self._db:
+ return False
+ if member in self._db[group]:
+ return True
+ return False
+
+ def group_size(self, ns: str, group: str) -> int:
+ if group not in self._db:
+ return 0
+ return len(self._db[group])
+
+
+class FakeDictBackendLock(DbBackendLockAbc):
+ """
+ A class providing fake implementation of database backend lock of Shared Data Layer (SDL).
+ This class does not provide working database solution, this class can be used in testing
+ purposes only. Implementation does not provide shared database resource, SDL client sees
+ only its local local 'fake' database, which is a simple Python dictionary. Also keys are
+ stored in database under the same namespace.
+ Args:
+ ns (str): Namespace under which this lock is targeted.
+ name (str): Lock name, identifies the lock key in a Redis database backend.
+ expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
+ been released earlier by a 'release' method.
+ redis_backend (FakeBackend): Database backend object containing fake databese connection.
+ """
+
+ def __init__(self, ns: str, name: str, expiration: Union[int, float],
+ redis_backend: FakeDictBackend) -> None:
+ super().__init__(ns, name)
+ self._locked = False
+ self._ns = ns
+ self._lock_name = name
+ self._lock_expiration = expiration
+ self.redis_backend = redis_backend
+
+ def __str__(self):
+ return str(
+ {
+ "lock DB type": "FAKE DB",
+ "lock namespace": self._ns,
+ "lock name": self._lock_name,
+ "lock status": self._lock_status_to_string()
+ }
+ )
+
+ def acquire(self, retry_interval: Union[int, float] = 0.1,
+ retry_timeout: Union[int, float] = 10) -> bool:
+ if self._locked:
+ return False
+ self._locked = True
+ return self._locked
+
+ def release(self) -> None:
+ self._locked = False
+
+ def refresh(self) -> None:
+ pass
+
+ def get_validity_time(self) -> Union[int, float]:
+ return self._lock_expiration
+
+ def _lock_status_to_string(self) -> str:
+ if self._locked:
+ return 'locked'
+ return 'unlocked'
def __str__(self):
return str(
{
+ "DB type": "Redis",
"Redis connection": repr(self.__redis)
}
)
def __str__(self):
return str(
{
+ "lock DB type": "Redis",
"lock namespace": self._ns,
"lock name": self._lock_name,
"lock status": self._lock_status_to_string()
"""The module provides implementation of Shared Data Layer (SDL) configurability."""
import os
+from enum import Enum
from collections import namedtuple
+class DbBackendType(Enum):
+ """Enumeration class of supported SDL database backend types."""
+ REDIS = 1
+ FAKE_DICT = 2
+
+
class _Configuration():
"""This class implements Shared Data Layer (SDL) configurability."""
Params = namedtuple('Params', ['db_host', 'db_port', 'db_sentinel_port',
- 'db_sentinel_master_name'])
+ 'db_sentinel_master_name', 'db_type'])
- def __init__(self):
- self.params = self._read_configuration()
+ def __init__(self, fake_db_backend):
+ self.params = self._read_configuration(fake_db_backend)
def __str__(self):
return str(
"DB host": self.params.db_host,
"DB port": self.params.db_port,
"DB master sentinel": self.params.db_sentinel_master_name,
- "DB sentinel port": self.params.db_sentinel_port
+ "DB sentinel port": self.params.db_sentinel_port,
+ "DB type": self.params.db_type.name,
}
)
return self.params
@classmethod
- def _read_configuration(cls):
+ def _read_configuration(cls, fake_db_backend):
+ backend_type = DbBackendType.REDIS
+ if fake_db_backend:
+ if fake_db_backend.lower() != 'dict':
+ msg = ("Configuration error: "
+ "SDL instance was initiated with wrong "
+ "'fake_db_backend' argument value: {}. "
+ "Value 'dict' is only supported.".
+ format(fake_db_backend))
+ raise ValueError(msg)
+
+ backend_type = DbBackendType.FAKE_DICT
+
return _Configuration.Params(db_host=os.getenv('DBAAS_SERVICE_HOST'),
db_port=os.getenv('DBAAS_SERVICE_PORT'),
db_sentinel_port=os.getenv('DBAAS_SERVICE_SENTINEL_PORT'),
- db_sentinel_master_name=os.getenv('DBAAS_MASTER_NAME'))
+ db_sentinel_master_name=os.getenv('DBAAS_MASTER_NAME'),
+ db_type=backend_type)
storage: 'SyncStorage') -> None:
super().__init__(ns, name, expiration)
- self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(ns, name, expiration,
+ self.__configuration = storage.get_configuration()
+ self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(self.__configuration,
+ ns, name, expiration,
storage.get_backend())
def __str__(self):
a namespace, namespace identifier is passed as a parameter to all the operations.
Args:
- None
+ fake_db_backend (str): Optional parameter. Parameter enables fake DB backend usage for an
+ SDL instance. Fake DB backend is ONLY allowed to use for testing
+ purposes at development phase of SDL clients when more advanced
+ database services are not necessarily needed. Currently value 'dict'
+ is only allowed value for the parameter, which enables dictionary
+ type of fake DB backend.
"""
- def __init__(self) -> None:
+ def __init__(self, fake_db_backend=None) -> None:
super().__init__()
- self.__configuration = _Configuration()
+ self.__configuration = _Configuration(fake_db_backend)
self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
def __del__(self):
def get_backend(self) -> DbBackendAbc:
"""Return backend instance."""
return self.__dbbackend
+
+ def get_configuration(self) -> _Configuration:
+ """Return configuration what was valid when the SDL instance was initiated."""
+ return self.__configuration
--- /dev/null
+# Copyright (c) 2019 AT&T Intellectual Property.
+# Copyright (c) 2018-2019 Nokia.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#
+
+
+from unittest.mock import Mock
+import pytest
+import ricsdl.backend
+from ricsdl.configuration import _Configuration
+from ricsdl.configuration import DbBackendType
+
+
+@pytest.fixture()
+def fake_dict_backend_fixture(request):
+ request.cls.ns = 'some-ns'
+ request.cls.dm = {'abc': b'1', 'bcd': b'2'}
+ request.cls.new_dm = {'abc': b'3', 'bcd': b'2'}
+ request.cls.remove_dm = {'bcd': b'2'}
+ request.cls.key = 'abc'
+ request.cls.keys = ['abc', 'bcd']
+ request.cls.old_data = b'1'
+ request.cls.new_data = b'3'
+ request.cls.keypattern = r'*bc*'
+ request.cls.group = 'some-group'
+ request.cls.groupmember = b'm1'
+ request.cls.groupmembers = set([b'm1', b'm2'])
+ request.cls.new_groupmembers = set(b'm3')
+ request.cls.all_groupmembers = request.cls.groupmembers | request.cls.new_groupmembers
+
+ request.cls.configuration = Mock()
+ mock_conf_params = _Configuration.Params(db_host=None,
+ db_port=None,
+ db_sentinel_port=None,
+ db_sentinel_master_name=None,
+ db_type=DbBackendType.FAKE_DICT)
+ request.cls.configuration.get_params.return_value = mock_conf_params
+ request.cls.db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+
+
+@pytest.mark.usefixtures('fake_dict_backend_fixture')
+class TestFakeDictBackend:
+ def test_set_function_success(self):
+ self.db.set(self.ns, self.dm)
+ ret = self.db.get(self.ns, self.keys)
+ assert ret == self.dm
+
+ def test_set_if_function_success(self):
+ self.db.set(self.ns, self.dm)
+ ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
+ assert ret is True
+ ret = self.db.get(self.ns, self.keys)
+ assert ret == self.new_dm
+
+ def test_set_if_function_returns_false_if_existing_key_value_not_expected(self):
+ self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
+ self.db.set(self.ns, self.new_dm)
+ ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
+ assert ret is False
+
+ def test_set_if_not_exists_function_success(self):
+ ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
+ assert ret is True
+ ret = self.db.get(self.ns, self.keys)
+ assert ret == {self.key: self.new_data}
+
+ def test_set_if_not_exists_function_returns_false_if_key_already_exists(self):
+ self.db.set(self.ns, self.dm)
+ ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
+ assert ret is False
+
+ def test_find_keys_function_success(self):
+ self.db.set(self.ns, self.dm)
+ ret = self.db.find_keys(self.ns, self.keypattern)
+ assert ret == self.keys
+
+ def test_find_keys_function_returns_empty_list_when_no_matching_keys_found(self):
+ ret = self.db.find_keys(self.ns, self.keypattern)
+ assert ret == []
+
+ def test_find_and_get_function_success(self):
+ self.db.set(self.ns, self.dm)
+ ret = self.db.find_and_get(self.ns, self.keypattern)
+ assert ret == self.dm
+
+ def test_find_and_get_function_returns_empty_dict_when_no_matching_keys_exist(self):
+ ret = self.db.find_and_get(self.ns, self.keypattern)
+ assert ret == dict()
+
+ def test_remove_function_success(self):
+ self.db.set(self.ns, self.dm)
+ self.db.remove(self.ns, self.keys)
+ ret = self.db.get(self.ns, self.keys)
+ assert ret == dict()
+
+ def test_remove_if_function_success(self):
+ self.db.set(self.ns, self.dm)
+ ret = self.db.remove_if(self.ns, self.key, self.old_data)
+ assert ret is True
+ ret = self.db.get(self.ns, self.keys)
+ assert ret == self.remove_dm
+
+ def test_remove_if_function_returns_false_if_data_does_not_match(self):
+ ret = self.db.remove_if(self.ns, self.key, self.old_data)
+ assert ret is False
+ self.db.set(self.ns, self.dm)
+ ret = self.db.remove_if(self.ns, self.key, self.new_data)
+ assert ret is False
+
+ def test_add_member_function_success(self):
+ self.db.add_member(self.ns, self.group, self.groupmembers)
+ ret = self.db.get_members(self.ns, self.group)
+ assert ret == self.groupmembers
+
+ self.db.add_member(self.ns, self.group, self.new_groupmembers)
+ ret = self.db.get_members(self.ns, self.group)
+ assert ret == self.all_groupmembers
+
+ def test_remove_member_function_success(self):
+ self.db.remove_member(self.ns, self.group, self.groupmembers)
+ self.db.add_member(self.ns, self.group, self.groupmembers)
+ self.db.remove_member(self.ns, self.group, self.groupmembers)
+ ret = self.db.get_members(self.ns, self.group)
+ assert ret == set()
+
+ def test_remove_group_function_success(self):
+ self.db.remove_group(self.ns, self.group)
+ ret = self.db.get_members(self.ns, self.group)
+ assert ret == set()
+
+ def test_is_member_function_success(self):
+ ret = self.db.is_member(self.ns, self.group, b'not member')
+ assert ret is False
+ self.db.add_member(self.ns, self.group, self.groupmembers)
+ ret = self.db.is_member(self.ns, self.group, self.groupmember)
+ assert ret is True
+ ret = self.db.is_member(self.ns, self.group, b'not member')
+ assert ret is False
+
+ def test_group_size_function_success(self):
+ ret = self.db.group_size(self.ns, self.group)
+ assert ret == 0
+ self.db.add_member(self.ns, self.group, self.groupmembers)
+ ret = self.db.group_size(self.ns, self.group)
+ assert ret == len(self.groupmembers)
+
+ def test_fake_dict_backend_object_string_representation(self):
+ assert str(self.db) == str({'DB type': 'FAKE DB'})
+
+@pytest.fixture()
+def fake_dict_backend_lock_fixture(request):
+ request.cls.ns = 'some-ns'
+ request.cls.lockname = 'some-lock-name'
+ request.cls.expiration = 10
+ request.cls.retry_interval = 0.1
+ request.cls.retry_timeout = 1
+
+ request.cls.configuration = Mock()
+ mock_conf_params = _Configuration.Params(db_host=None,
+ db_port=None,
+ db_sentinel_port=None,
+ db_sentinel_master_name=None,
+ db_type=DbBackendType.FAKE_DICT)
+ request.cls.configuration.get_params.return_value = mock_conf_params
+ request.cls.lock = ricsdl.backend.get_backend_lock_instance(request.cls.configuration,
+ request.cls.ns,
+ request.cls.lockname,
+ request.cls.expiration,
+ Mock())
+
+
+@pytest.mark.usefixtures('fake_dict_backend_lock_fixture')
+class TestFakeDictBackendLock:
+ def test_acquire_function_success(self):
+ ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
+ assert ret is True
+
+ def test_acquire_function_returns_false_if_lock_is_not_acquired(self):
+ self.lock.acquire(self.retry_interval, self.retry_timeout)
+ ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
+ assert ret is False
+
+ def test_release_function_success(self):
+ self.lock.acquire(self.retry_interval, self.retry_timeout)
+ ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
+ assert ret is False
+ self.lock.release()
+ ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
+ assert ret is True
+
+ def test_get_validity_time_function_success(self):
+ ret = self.lock.get_validity_time()
+ assert ret == self.expiration
+
+ def test_fake_dict_backend_lock_object_string_representation(self):
+ expected_lock_info = {'lock DB type': 'FAKE DB',
+ 'lock namespace': 'some-ns',
+ 'lock name': 'some-lock-name',
+ 'lock status': 'unlocked'}
+ assert str(self.lock) == str(expected_lock_info)
import ricsdl.backend
from ricsdl.backend.redis import (RedisBackendLock, _map_to_sdl_exception)
from ricsdl.configuration import _Configuration
+from ricsdl.configuration import DbBackendType
import ricsdl.exceptions
mock_conf_params = _Configuration.Params(db_host=None,
db_port=None,
db_sentinel_port=None,
- db_sentinel_master_name=None)
+ db_sentinel_master_name=None,
+ db_type=DbBackendType.REDIS)
request.cls.configuration.get_params.return_value = mock_conf_params
with patch('ricsdl.backend.redis.Redis') as mock_redis:
db = ricsdl.backend.get_backend_instance(request.cls.configuration)
self.new_data, self.old_data)
assert ret is True
- def test_set_if_function_returns_false_if_same_data_already_exists(self):
+ def test_set_if_function_returns_false_if_existing_key_value_not_expected(self):
self.mock_redis.execute_command.return_value = False
ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
self.mock_redis.execute_command.assert_called_once_with('SETIE', self.key_redis,
mocked_dbbackend = Mock()
mocked_dbbackend.get_redis_connection.return_value = request.cls.mock_redis
+
+ request.cls.configuration = Mock()
+ mock_conf_params = _Configuration.Params(db_host=None,
+ db_port=None,
+ db_sentinel_port=None,
+ db_sentinel_master_name=None,
+ db_type=DbBackendType.REDIS)
+ request.cls.configuration.get_params.return_value = mock_conf_params
+
with patch('ricsdl.backend.redis.Lock') as mock_redis_lock:
- lock = ricsdl.backend.get_backend_lock_instance(request.cls.ns, request.cls.lockname,
+ lock = ricsdl.backend.get_backend_lock_instance(request.cls.configuration,
+ request.cls.ns, request.cls.lockname,
request.cls.expiration, mocked_dbbackend)
request.cls.mock_redis_lock = mock_redis_lock.return_value
request.cls.lock = lock
@pytest.mark.usefixtures('redis_backend_lock_fixture')
class TestRedisBackendLock:
def test_acquire_function_success(self):
- self.lock.acquire(self.retry_interval, self.retry_timeout)
+ self.mock_redis_lock.acquire.return_value = True
+ ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
self.mock_redis_lock.acquire.assert_called_once_with(blocking_timeout=self.retry_timeout)
+ assert ret is True
+
+ def test_acquire_function_returns_false_if_lock_is_not_acquired(self):
+ self.mock_redis_lock.acquire.return_value = False
+ ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
+ self.mock_redis_lock.acquire.assert_called_once_with(blocking_timeout=self.retry_timeout)
+ assert ret is False
def test_acquire_function_can_map_redis_exception_to_sdl_exception(self):
self.mock_redis_lock.acquire.side_effect = redis_exceptions.LockError('redis lock error!')
assert f"Getting validity time of a lock {self.lockname} failed with error code: -10" in str(excinfo.value)
def test_redis_backend_lock_object_string_representation(self):
- expected_lock_info = {'lock namespace': 'some-ns',
+ expected_lock_info = {'lock DB type': 'Redis',
+ 'lock namespace': 'some-ns',
'lock name': 'some-lock-name',
'lock status': 'locked'}
assert str(self.lock) == str(expected_lock_info)
def test_redis_backend_lock_object_string_representation_can_catch_redis_exception(self):
self.mock_redis_lock.owned.side_effect = redis_exceptions.LockError('redis lock error!')
- expected_lock_info = {'lock namespace': 'some-ns',
+ expected_lock_info = {'lock DB type': 'Redis',
+ 'lock namespace': 'some-ns',
'lock name': 'some-lock-name',
'lock status': 'Error: redis lock error!'}
assert str(self.lock) == str(expected_lock_info)
import pytest
from ricsdl.configuration import _Configuration
+from ricsdl.configuration import DbBackendType
@pytest.fixture()
monkeypatch.setenv('DBAAS_SERVICE_PORT', '10000')
monkeypatch.setenv('DBAAS_SERVICE_SENTINEL_PORT', '11000')
monkeypatch.setenv('DBAAS_MASTER_NAME', 'my-master')
- request.cls.config = _Configuration()
- request.cls.config._read_configuration()
+ request.cls.config = _Configuration(fake_db_backend=None)
@pytest.fixture
-def config_missing_fixture(request, monkeypatch):
+def fake_db_config_fixture(request, monkeypatch):
monkeypatch.delenv('DBAAS_SERVICE_HOST', raising=False)
monkeypatch.delenv('DBAAS_SERVICE_PORT', raising=False)
monkeypatch.delenv('DBAAS_SERVICE_SENTINEL_PORT', raising=False)
monkeypatch.delenv('DBAAS_MASTER_NAME', raising=False)
- request.cls.config = _Configuration()
- request.cls.config._read_configuration()
+ request.cls.config = _Configuration(fake_db_backend='dict')
class TestConfiguration:
def test_get_params_function_returns_read_configuration(self, config_fixture):
expected_config = _Configuration.Params(db_host='10.20.30.40', db_port='10000',
db_sentinel_port='11000',
- db_sentinel_master_name='my-master')
+ db_sentinel_master_name='my-master',
+ db_type=DbBackendType.REDIS)
assert expected_config == self.config.get_params()
- def test_get_params_function_can_return_empty_configuration(self, config_missing_fixture):
+ def test_get_params_function_can_return_fake_db_configuration(self, fake_db_config_fixture):
expected_config = _Configuration.Params(db_host=None, db_port=None,
db_sentinel_port=None,
- db_sentinel_master_name=None)
+ db_sentinel_master_name=None,
+ db_type=DbBackendType.FAKE_DICT)
assert expected_config == self.config.get_params()
+ def test_get_params_function_can_raise_exception_if_wrong_fake_db_type(self):
+ with pytest.raises(ValueError, match=r"Configuration error"):
+ _Configuration(fake_db_backend='bad value')
+
+
def test_configuration_object_string_representation(self, config_fixture):
expected_config_info = {'DB host': '10.20.30.40',
'DB port': '10000',
'DB master sentinel': 'my-master',
- 'DB sentinel port': '11000'}
+ 'DB sentinel port': '11000',
+ 'DB type': 'REDIS'}
assert str(self.config) == str(expected_config_info)
- def test_configuration_object_string_representation_if_no_config(self, config_missing_fixture):
+ def test_configuration_object_string_representation_if_fake_db(self, fake_db_config_fixture):
expected_config_info = {'DB host': None,
'DB port': None,
'DB master sentinel': None,
- 'DB sentinel port': None}
+ 'DB sentinel port': None,
+ 'DB type': 'FAKE_DICT'}
assert str(self.config) == str(expected_config_info)