1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from typing import List, Set, Tuple
17 from o2ims.domain import ocloud
20 class OcloudRepository(abc.ABC):
22 self.seen = set() # type: Set[ocloud.Ocloud]
24 def add(self, ocloud: ocloud.Ocloud):
28 def get(self, ocloud_id) -> ocloud.Ocloud:
29 ocloud = self._get(ocloud_id)
34 def list(self, *args) -> List[ocloud.Ocloud]:
35 return self._list(*args)
37 def update(self, ocloud: ocloud.Ocloud):
41 # def update_fields(self, ocloudid: str, updatefields: dict):
42 # self._update(ocloudid, updatefields)
45 def _add(self, ocloud: ocloud.Ocloud):
46 raise NotImplementedError
49 def _get(self, ocloud_id) -> ocloud.Ocloud:
50 raise NotImplementedError
53 def _list(self, *args) -> List[ocloud.Ocloud]:
54 raise NotImplementedError
57 def _update(self, ocloud: ocloud.Ocloud):
58 raise NotImplementedError
61 class ResourceTypeRepository(abc.ABC):
63 self.seen = set() # type: Set[ocloud.ResourceType]
65 def add(self, resource_type: ocloud.ResourceType):
66 self._add(resource_type)
67 self.seen.add(resource_type)
69 def get(self, resource_type_id) -> ocloud.ResourceType:
70 resource_type = self._get(resource_type_id)
72 self.seen.add(resource_type)
75 def get_by_name(self, resource_type_name) -> ocloud.ResourceType:
76 resource_type = self._get_by_name(resource_type_name)
78 self.seen.add(resource_type)
81 def list(self, *args) -> List[ocloud.ResourceType]:
82 return self._list(*args)[1]
84 def list_with_count(self, *args, **kwargs) -> \
85 Tuple[int, List[ocloud.ResourceType]]:
86 return self._list(*args, **kwargs)
88 def update(self, resource_type: ocloud.ResourceType):
89 self._update(resource_type)
90 self.seen.add(resource_type)
93 def _add(self, resource_type: ocloud.ResourceType):
94 raise NotImplementedError
97 def _get(self, resource_type_id) -> ocloud.ResourceType:
98 raise NotImplementedError
101 def _get_by_name(self, resource_type_name) -> ocloud.ResourceType:
102 raise NotImplementedError
105 def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourceType]]:
106 raise NotImplementedError
109 def _update(self, resource_type: ocloud.ResourceType):
110 raise NotImplementedError
113 class ResourcePoolRepository(abc.ABC):
115 self.seen = set() # type: Set[ocloud.ResourcePool]
117 def add(self, resource_pool: ocloud.ResourcePool):
118 self._add(resource_pool)
119 self.seen.add(resource_pool)
121 def get(self, resource_pool_id) -> ocloud.ResourcePool:
122 resource_pool = self._get(resource_pool_id)
124 self.seen.add(resource_pool)
127 def list(self, *args) -> List[ocloud.ResourcePool]:
128 return self._list(*args)[1]
130 def list_with_count(self, *args, **kwargs) -> \
131 Tuple[int, List[ocloud.ResourcePool]]:
132 return self._list(*args, **kwargs)
134 def update(self, resource_pool: ocloud.ResourcePool):
135 self._update(resource_pool)
136 self.seen.add(resource_pool)
139 def _add(self, resource_pool: ocloud.ResourcePool):
140 raise NotImplementedError
143 def _get(self, resource_pool_id) -> ocloud.ResourcePool:
144 raise NotImplementedError
147 def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourcePool]]:
148 raise NotImplementedError
151 def _update(self, resource_pool: ocloud.ResourcePool):
152 raise NotImplementedError
155 class ResourceRepository(abc.ABC):
157 self.seen = set() # type: Set[ocloud.Resource]
159 def add(self, resource: ocloud.Resource):
161 self.seen.add(resource)
163 def get(self, resource_id) -> ocloud.Resource:
164 resource = self._get(resource_id)
166 self.seen.add(resource)
169 def list(self, resourcepool_id, *args) -> List[ocloud.Resource]:
170 return self._list(resourcepool_id, *args)[1]
172 def list_with_count(self, resourcepool_id, *args, **kwargs) -> \
173 Tuple[int, List[ocloud.Resource]]:
174 return self._list(resourcepool_id, *args, **kwargs)
176 def update(self, resource: ocloud.Resource):
177 self._update(resource)
178 self.seen.add(resource)
181 def _add(self, resource: ocloud.Resource):
182 raise NotImplementedError
185 def _get(self, resource_id) -> ocloud.Resource:
186 raise NotImplementedError
189 def _list(self, resourcepool_id, **kwargs) -> \
190 Tuple[int, List[ocloud.Resource]]:
191 raise NotImplementedError
194 def _update(self, resource: ocloud.Resource):
195 raise NotImplementedError
198 class DeploymentManagerRepository(abc.ABC):
200 self.seen = set() # type: Set[ocloud.DeploymentManager]
202 def add(self, deployment_manager: ocloud.DeploymentManager):
203 self._add(deployment_manager)
204 self.seen.add(deployment_manager)
206 def get(self, deployment_manager_id) -> ocloud.DeploymentManager:
207 deployment_manager = self._get(deployment_manager_id)
208 if deployment_manager:
209 self.seen.add(deployment_manager)
210 return deployment_manager
212 def list(self, *args) -> List[ocloud.DeploymentManager]:
213 return self._list(*args)[1]
215 def list_with_count(self, *args, **kwargs) -> \
216 Tuple[int, List[ocloud.DeploymentManager]]:
217 return self._list(*args, **kwargs)
219 def update(self, deployment_manager: ocloud.DeploymentManager):
220 self._update(deployment_manager)
223 def _add(self, deployment_manager: ocloud.DeploymentManager):
224 raise NotImplementedError
227 def _get(self, deployment_manager_id) -> ocloud.DeploymentManager:
228 raise NotImplementedError
231 def _list(self, **kwargs) -> Tuple[int, List[ocloud.DeploymentManager]]:
232 raise NotImplementedError
235 def _update(self, deployment_manager: ocloud.DeploymentManager):
236 raise NotImplementedError