1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from typing import List, Set, Tuple
17 from o2ims.domain import ocloud
20 class OcloudRepository(abc.ABC):
22 self.seen = set() # type: Set[ocloud.Ocloud]
24 def add(self, ocloud: ocloud.Ocloud):
28 def get(self, ocloud_id) -> ocloud.Ocloud:
29 ocloud = self._get(ocloud_id)
34 def list(self) -> List[ocloud.Ocloud]:
37 def update(self, ocloud: ocloud.Ocloud):
41 # def update_fields(self, ocloudid: str, updatefields: dict):
42 # self._update(ocloudid, updatefields)
45 def _add(self, ocloud: ocloud.Ocloud):
46 raise NotImplementedError
49 def _get(self, ocloud_id) -> ocloud.Ocloud:
50 raise NotImplementedError
53 def _update(self, ocloud: ocloud.Ocloud):
54 raise NotImplementedError
57 class ResourceTypeRepository(abc.ABC):
59 self.seen = set() # type: Set[ocloud.ResourceType]
61 def add(self, resource_type: ocloud.ResourceType):
62 self._add(resource_type)
63 self.seen.add(resource_type)
65 def get(self, resource_type_id) -> ocloud.ResourceType:
66 resource_type = self._get(resource_type_id)
68 self.seen.add(resource_type)
71 def get_by_name(self, resource_type_name) -> ocloud.ResourceType:
72 resource_type = self._get_by_name(resource_type_name)
74 self.seen.add(resource_type)
77 def list(self, **kwargs) -> List[ocloud.ResourceType]:
78 return self._list(*[], **kwargs)[1]
80 def list_with_count(self, *args, **kwargs) -> \
81 Tuple[int, List[ocloud.ResourceType]]:
82 return self._list(*args, **kwargs)
84 def update(self, resource_type: ocloud.ResourceType):
85 self._update(resource_type)
86 self.seen.add(resource_type)
89 def _add(self, resource_type: ocloud.ResourceType):
90 raise NotImplementedError
93 def _get(self, resource_type_id) -> ocloud.ResourceType:
94 raise NotImplementedError
97 def _get_by_name(self, resource_type_name) -> ocloud.ResourceType:
98 raise NotImplementedError
101 def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourceType]]:
102 raise NotImplementedError
105 def _update(self, resource_type: ocloud.ResourceType):
106 raise NotImplementedError
109 class ResourcePoolRepository(abc.ABC):
111 self.seen = set() # type: Set[ocloud.ResourcePool]
113 def add(self, resource_pool: ocloud.ResourcePool):
114 self._add(resource_pool)
115 self.seen.add(resource_pool)
117 def get(self, resource_pool_id) -> ocloud.ResourcePool:
118 resource_pool = self._get(resource_pool_id)
120 self.seen.add(resource_pool)
123 def list(self, **kwargs) -> List[ocloud.ResourcePool]:
124 return self._list(*[], **kwargs)[1]
126 def list_with_count(self, *args, **kwargs) -> \
127 Tuple[int, List[ocloud.ResourcePool]]:
128 return self._list(*args, **kwargs)
130 def update(self, resource_pool: ocloud.ResourcePool):
131 self._update(resource_pool)
132 self.seen.add(resource_pool)
135 def _add(self, resource_pool: ocloud.ResourcePool):
136 raise NotImplementedError
139 def _get(self, resource_pool_id) -> ocloud.ResourcePool:
140 raise NotImplementedError
143 def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourcePool]]:
144 raise NotImplementedError
147 def _update(self, resource_pool: ocloud.ResourcePool):
148 raise NotImplementedError
151 class ResourceRepository(abc.ABC):
153 self.seen = set() # type: Set[ocloud.Resource]
155 def add(self, resource: ocloud.Resource):
157 self.seen.add(resource)
159 def get(self, resource_id) -> ocloud.Resource:
160 resource = self._get(resource_id)
162 self.seen.add(resource)
165 def list(self, resourcepool_id, **kwargs) -> List[ocloud.Resource]:
166 return self._list(resourcepool_id, *[], **kwargs)[1]
168 def list_with_count(self, resourcepool_id, *args, **kwargs) -> \
169 Tuple[int, List[ocloud.Resource]]:
170 return self._list(resourcepool_id, *args, **kwargs)
172 def update(self, resource: ocloud.Resource):
173 self._update(resource)
174 self.seen.add(resource)
177 def _add(self, resource: ocloud.Resource):
178 raise NotImplementedError
181 def _get(self, resource_id) -> ocloud.Resource:
182 raise NotImplementedError
185 def _list(self, resourcepool_id, **kwargs) -> \
186 Tuple[int, List[ocloud.Resource]]:
187 raise NotImplementedError
190 def _update(self, resource: ocloud.Resource):
191 raise NotImplementedError
194 class DeploymentManagerRepository(abc.ABC):
196 self.seen = set() # type: Set[ocloud.DeploymentManager]
198 def add(self, deployment_manager: ocloud.DeploymentManager):
199 self._add(deployment_manager)
200 self.seen.add(deployment_manager)
202 def get(self, deployment_manager_id) -> ocloud.DeploymentManager:
203 deployment_manager = self._get(deployment_manager_id)
204 if deployment_manager:
205 self.seen.add(deployment_manager)
206 return deployment_manager
208 def list(self, **kwargs) -> List[ocloud.DeploymentManager]:
209 return self._list(*[], **kwargs)[1]
211 def list_with_count(self, *args, **kwargs) -> \
212 Tuple[int, List[ocloud.DeploymentManager]]:
213 return self._list(*args, **kwargs)
215 def update(self, deployment_manager: ocloud.DeploymentManager):
216 self._update(deployment_manager)
219 def _add(self, deployment_manager: ocloud.DeploymentManager):
220 raise NotImplementedError
223 def _get(self, deployment_manager_id) -> ocloud.DeploymentManager:
224 raise NotImplementedError
227 def _list(self, **kwargs) -> Tuple[int, List[ocloud.DeploymentManager]]:
228 raise NotImplementedError
231 def _update(self, deployment_manager: ocloud.DeploymentManager):
232 raise NotImplementedError