Fix INF-381 another issue cause this case still failed
[pti/o2.git] / o2ims / domain / ocloud_repo.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import abc
16 from typing import List, Set, Tuple
17 from o2ims.domain import ocloud
18
19
20 class OcloudRepository(abc.ABC):
21     def __init__(self):
22         self.seen = set()  # type: Set[ocloud.Ocloud]
23
24     def add(self, ocloud: ocloud.Ocloud):
25         self._add(ocloud)
26         self.seen.add(ocloud)
27
28     def get(self, ocloud_id) -> ocloud.Ocloud:
29         ocloud = self._get(ocloud_id)
30         if ocloud:
31             self.seen.add(ocloud)
32         return ocloud
33
34     def list(self) -> List[ocloud.Ocloud]:
35         return self._list()
36
37     def update(self, ocloud: ocloud.Ocloud):
38         self._update(ocloud)
39         self.seen.add(ocloud)
40
41     # def update_fields(self, ocloudid: str, updatefields: dict):
42     #     self._update(ocloudid, updatefields)
43
44     @abc.abstractmethod
45     def _add(self, ocloud: ocloud.Ocloud):
46         raise NotImplementedError
47
48     @abc.abstractmethod
49     def _get(self, ocloud_id) -> ocloud.Ocloud:
50         raise NotImplementedError
51
52     @abc.abstractmethod
53     def _update(self, ocloud: ocloud.Ocloud):
54         raise NotImplementedError
55
56
57 class ResourceTypeRepository(abc.ABC):
58     def __init__(self):
59         self.seen = set()  # type: Set[ocloud.ResourceType]
60
61     def add(self, resource_type: ocloud.ResourceType):
62         self._add(resource_type)
63         self.seen.add(resource_type)
64
65     def get(self, resource_type_id) -> ocloud.ResourceType:
66         resource_type = self._get(resource_type_id)
67         if resource_type:
68             self.seen.add(resource_type)
69         return resource_type
70
71     def get_by_name(self, resource_type_name) -> ocloud.ResourceType:
72         resource_type = self._get_by_name(resource_type_name)
73         if resource_type:
74             self.seen.add(resource_type)
75         return resource_type
76
77     def list(self, *args) -> List[ocloud.ResourceType]:
78         return self._list(*args)[1]
79
80     def list_with_count(self, *args, **kwargs) -> \
81             Tuple[int, List[ocloud.ResourceType]]:
82         return self._list(*args, **kwargs)
83
84     def update(self, resource_type: ocloud.ResourceType):
85         self._update(resource_type)
86         self.seen.add(resource_type)
87
88     @abc.abstractmethod
89     def _add(self, resource_type: ocloud.ResourceType):
90         raise NotImplementedError
91
92     @abc.abstractmethod
93     def _get(self, resource_type_id) -> ocloud.ResourceType:
94         raise NotImplementedError
95
96     @abc.abstractmethod
97     def _get_by_name(self, resource_type_name) -> ocloud.ResourceType:
98         raise NotImplementedError
99
100     @abc.abstractmethod
101     def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourceType]]:
102         raise NotImplementedError
103
104     @abc.abstractmethod
105     def _update(self, resource_type: ocloud.ResourceType):
106         raise NotImplementedError
107
108
109 class ResourcePoolRepository(abc.ABC):
110     def __init__(self):
111         self.seen = set()  # type: Set[ocloud.ResourcePool]
112
113     def add(self, resource_pool: ocloud.ResourcePool):
114         self._add(resource_pool)
115         self.seen.add(resource_pool)
116
117     def get(self, resource_pool_id) -> ocloud.ResourcePool:
118         resource_pool = self._get(resource_pool_id)
119         if resource_pool:
120             self.seen.add(resource_pool)
121         return resource_pool
122
123     def list(self, *args) -> List[ocloud.ResourcePool]:
124         return self._list(*args)[1]
125
126     def list_with_count(self, *args, **kwargs) -> \
127             Tuple[int, List[ocloud.ResourcePool]]:
128         return self._list(*args, **kwargs)
129
130     def update(self, resource_pool: ocloud.ResourcePool):
131         self._update(resource_pool)
132         self.seen.add(resource_pool)
133
134     @abc.abstractmethod
135     def _add(self, resource_pool: ocloud.ResourcePool):
136         raise NotImplementedError
137
138     @abc.abstractmethod
139     def _get(self, resource_pool_id) -> ocloud.ResourcePool:
140         raise NotImplementedError
141
142     @abc.abstractmethod
143     def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourcePool]]:
144         raise NotImplementedError
145
146     @abc.abstractmethod
147     def _update(self, resource_pool: ocloud.ResourcePool):
148         raise NotImplementedError
149
150
151 class ResourceRepository(abc.ABC):
152     def __init__(self):
153         self.seen = set()  # type: Set[ocloud.Resource]
154
155     def add(self, resource: ocloud.Resource):
156         self._add(resource)
157         self.seen.add(resource)
158
159     def get(self, resource_id) -> ocloud.Resource:
160         resource = self._get(resource_id)
161         if resource:
162             self.seen.add(resource)
163         return resource
164
165     def list(self, resourcepool_id, *args) -> List[ocloud.Resource]:
166         return self._list(resourcepool_id, *args)[1]
167
168     def list_with_count(self, resourcepool_id, *args, **kwargs) -> \
169             Tuple[int, List[ocloud.Resource]]:
170         return self._list(resourcepool_id, *args, **kwargs)
171
172     def update(self, resource: ocloud.Resource):
173         self._update(resource)
174         self.seen.add(resource)
175
176     @abc.abstractmethod
177     def _add(self, resource: ocloud.Resource):
178         raise NotImplementedError
179
180     @abc.abstractmethod
181     def _get(self, resource_id) -> ocloud.Resource:
182         raise NotImplementedError
183
184     @abc.abstractmethod
185     def _list(self, resourcepool_id, **kwargs) -> \
186             Tuple[int, List[ocloud.Resource]]:
187         raise NotImplementedError
188
189     @abc.abstractmethod
190     def _update(self, resource: ocloud.Resource):
191         raise NotImplementedError
192
193
194 class DeploymentManagerRepository(abc.ABC):
195     def __init__(self):
196         self.seen = set()  # type: Set[ocloud.DeploymentManager]
197
198     def add(self, deployment_manager: ocloud.DeploymentManager):
199         self._add(deployment_manager)
200         self.seen.add(deployment_manager)
201
202     def get(self, deployment_manager_id) -> ocloud.DeploymentManager:
203         deployment_manager = self._get(deployment_manager_id)
204         if deployment_manager:
205             self.seen.add(deployment_manager)
206         return deployment_manager
207
208     def list(self, *args) -> List[ocloud.DeploymentManager]:
209         return self._list(*args)[1]
210
211     def list_with_count(self, *args, **kwargs) -> \
212             Tuple[int, List[ocloud.DeploymentManager]]:
213         return self._list(*args, **kwargs)
214
215     def update(self, deployment_manager: ocloud.DeploymentManager):
216         self._update(deployment_manager)
217
218     @abc.abstractmethod
219     def _add(self, deployment_manager: ocloud.DeploymentManager):
220         raise NotImplementedError
221
222     @abc.abstractmethod
223     def _get(self, deployment_manager_id) -> ocloud.DeploymentManager:
224         raise NotImplementedError
225
226     @abc.abstractmethod
227     def _list(self, **kwargs) -> Tuple[int, List[ocloud.DeploymentManager]]:
228         raise NotImplementedError
229
230     @abc.abstractmethod
231     def _update(self, deployment_manager: ocloud.DeploymentManager):
232         raise NotImplementedError