Fix fake SDL database backend to support multiple set calls
[ric-plt/sdlpy.git] / ricsdl-package / ricsdl / backend / fake_dict_db.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21
22 """The module provides fake implementation of Shared Data Layer (SDL) database backend interface."""
23 import fnmatch
24 from typing import (Dict, Set, List, Union)
25 from ricsdl.configuration import _Configuration
26 from .dbbackend_abc import DbBackendAbc
27 from .dbbackend_abc import DbBackendLockAbc
28
29
30 class FakeDictBackend(DbBackendAbc):
31     """
32     A class providing fake implementation of database backend of Shared Data Layer (SDL).
33     This class does not provide working database solution, this class can be used in testing
34     purposes only. Implementation does not provide shared database resource, SDL client sees
35     only its local local 'fake' database, which is a simple Python dictionary. Also keys are
36     stored in database under the same namespace.
37
38     Args:
39         configuration (_Configuration): SDL configuration, containing credentials to connect to
40                                         Redis database backend.
41     """
42     def __init__(self, configuration: _Configuration) -> None:
43         super().__init__()
44         self._db = {}
45         self._configuration = configuration
46
47     def __str__(self):
48         return str(
49             {
50                 "DB type": "FAKE DB",
51             }
52         )
53
54     def close(self):
55         pass
56
57     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
58         self._db.update(data_map.copy())
59
60     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
61         if key not in self._db:
62             return False
63         db_data = self._db[key]
64         if db_data == old_data:
65             self._db[key] = new_data
66             return True
67         return False
68
69     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
70         if key not in self._db:
71             self._db[key] = data
72             return True
73         return False
74
75     def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
76         ret = {}
77         for k in keys:
78             if k in self._db:
79                 ret[k] = self._db[k]
80         return ret
81
82     def find_keys(self, ns: str, key_pattern: str) -> List[str]:
83         ret = []
84         for k in self._db:
85             if fnmatch.fnmatch(k, key_pattern):
86                 ret.append(k)
87         return ret
88
89     def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
90         ret = {}
91         for key, val in self._db.items():
92             if fnmatch.fnmatch(key, key_pattern):
93                 ret[key] = val
94         return ret
95
96     def remove(self, ns: str, keys: List[str]) -> None:
97         for key in keys:
98             self._db.pop(key, None)
99
100     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
101         if key in self._db:
102             db_data = self._db[key]
103             if db_data == data:
104                 self._db.pop(key)
105                 return True
106         return False
107
108     def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
109         if group in self._db:
110             self._db[group] = self._db[group] | members.copy()
111         else:
112             self._db[group] = members.copy()
113
114     def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
115         if group not in self._db:
116             return
117         for member in members:
118             self._db[group].discard(member)
119
120     def remove_group(self, ns: str, group: str) -> None:
121         self._db.pop(group, None)
122
123     def get_members(self, ns: str, group: str) -> Set[bytes]:
124         return self._db.get(group, set())
125
126     def is_member(self, ns: str, group: str, member: bytes) -> bool:
127         if group not in self._db:
128             return False
129         if member in self._db[group]:
130             return True
131         return False
132
133     def group_size(self, ns: str, group: str) -> int:
134         if group not in self._db:
135             return 0
136         return len(self._db[group])
137
138
139 class FakeDictBackendLock(DbBackendLockAbc):
140     """
141     A class providing fake implementation of database backend lock of Shared Data Layer (SDL).
142     This class does not provide working database solution, this class can be used in testing
143     purposes only. Implementation does not provide shared database resource, SDL client sees
144     only its local local 'fake' database, which is a simple Python dictionary. Also keys are
145     stored in database under the same namespace.
146     Args:
147         ns (str): Namespace under which this lock is targeted.
148         name (str): Lock name, identifies the lock key in a Redis database backend.
149         expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
150                                  been released earlier by a 'release' method.
151         redis_backend (FakeBackend): Database backend object containing fake databese connection.
152     """
153
154     def __init__(self, ns: str, name: str, expiration: Union[int, float],
155                  redis_backend: FakeDictBackend) -> None:
156         super().__init__(ns, name)
157         self._locked = False
158         self._ns = ns
159         self._lock_name = name
160         self._lock_expiration = expiration
161         self.redis_backend = redis_backend
162
163     def __str__(self):
164         return str(
165             {
166                 "lock DB type": "FAKE DB",
167                 "lock namespace": self._ns,
168                 "lock name": self._lock_name,
169                 "lock status": self._lock_status_to_string()
170             }
171         )
172
173     def acquire(self, retry_interval: Union[int, float] = 0.1,
174                 retry_timeout: Union[int, float] = 10) -> bool:
175         if self._locked:
176             return False
177         self._locked = True
178         return self._locked
179
180     def release(self) -> None:
181         self._locked = False
182
183     def refresh(self) -> None:
184         pass
185
186     def get_validity_time(self) -> Union[int, float]:
187         return self._lock_expiration
188
189     def _lock_status_to_string(self) -> str:
190         if self._locked:
191             return 'locked'
192         return 'unlocked'