Docs: Enable O2 DMS by exposing k8s API endpoint
[pti/o2.git] / o2dms / domain / dms_repo.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import abc
16 from typing import List, Set
17 from o2dms.domain import dms
18
19
20 class NfDeploymentRepository(abc.ABC):
21     def __init__(self):
22         self.seen = set()  # type: Set[dms.NfDeployment]
23
24     def add(self, nfdeployment: dms.NfDeployment):
25         self._add(nfdeployment)
26         self.seen.add(nfdeployment)
27
28     def get(self, nfdeployment_id) -> dms.NfDeployment:
29         nfdeployment = self._get(nfdeployment_id)
30         if nfdeployment:
31             self.seen.add(nfdeployment)
32         return nfdeployment
33
34     def list(self) -> List[dms.NfDeployment]:
35         return self._list()
36
37     def update(self, id, **kwargs):
38         self._update(id, **kwargs)
39
40     def delete(self, nfdeployment_id):
41         self._delete(nfdeployment_id)
42
43     def count(self, **kwargs):
44         return self._count(**kwargs)
45
46     @abc.abstractmethod
47     def _add(self, nfdeployment: dms.NfDeployment):
48         raise NotImplementedError
49
50     @abc.abstractmethod
51     def _get(self, nfdeployment_id) -> dms.NfDeployment:
52         raise NotImplementedError
53
54     @abc.abstractmethod
55     def _update(self,  id, **kwargs):
56         raise NotImplementedError
57
58     @abc.abstractmethod
59     def _delete(self, nfdeployment_id):
60         raise NotImplementedError
61
62     @abc.abstractmethod
63     def _count(self, **kwargs):
64         raise NotImplementedError
65
66
67 class NfDeploymentDescRepository(abc.ABC):
68     def __init__(self):
69         self.seen = set()  # type: Set[dms.NfDeploymentDesc]
70
71     def add(self, nfdeployment_descriptor: dms.NfDeploymentDesc):
72         self._add(nfdeployment_descriptor)
73         self.seen.add(nfdeployment_descriptor)
74
75     def get(self, nfdeployment_descriptor_id) -> dms.NfDeploymentDesc:
76         nfdeployment_descriptor = self._get(nfdeployment_descriptor_id)
77         if nfdeployment_descriptor:
78             self.seen.add(nfdeployment_descriptor)
79         return nfdeployment_descriptor
80
81     def list(self) -> List[dms.NfDeploymentDesc]:
82         return self._list()
83
84     def update(self, id, **kwargs):
85         self._update(id, **kwargs)
86
87     def delete(self, nfdeployment_descriptor_id):
88         self._delete(nfdeployment_descriptor_id)
89
90     def count(self, **kwargs):
91         return self._count(**kwargs)
92
93     @abc.abstractmethod
94     def _add(self, nfdeployment_descriptor: dms.NfDeploymentDesc):
95         raise NotImplementedError
96
97     @abc.abstractmethod
98     def _get(self, nfdeployment_descriptor_id) -> dms.NfDeploymentDesc:
99         raise NotImplementedError
100
101     @abc.abstractmethod
102     def _update(self,  id, **kwargs):
103         raise NotImplementedError
104
105     @abc.abstractmethod
106     def _delete(self, nfdeployment_descriptor_id):
107         raise NotImplementedError
108
109     @abc.abstractmethod
110     def _count(self, **kwargs):
111         raise NotImplementedError
112
113
114 class NfOCloudVResourceRepository(abc.ABC):
115     def __init__(self):
116         self.seen = set()  # type: Set[dms.NfOCloudVResource]
117
118     def add(self, nfocloudvres: dms.NfOCloudVResource):
119         self._add(nfocloudvres)
120         self.seen.add(nfocloudvres)
121
122     def get(self, nfocloudvres_id) -> dms.NfOCloudVResource:
123         nfocloudvres = self._get(nfocloudvres_id)
124         if nfocloudvres:
125             self.seen.add(nfocloudvres)
126         return nfocloudvres
127
128     def list(self) -> List[dms.NfOCloudVResource]:
129         return self._list()
130
131     def update(self, nfocloudvres_id, **kwargs):
132         self._update(nfocloudvres_id, **kwargs)
133
134     def delete(self, nfocloudvres_id):
135         self._delete(nfocloudvres_id)
136
137     @abc.abstractmethod
138     def _add(self, nfocloudvres: dms.NfOCloudVResource):
139         raise NotImplementedError
140
141     @abc.abstractmethod
142     def _get(self, nfocloudvres_id) -> dms.NfOCloudVResource:
143         raise NotImplementedError
144
145     @abc.abstractmethod
146     def _update(self,  nfocloudvres_id, **kwargs):
147         raise NotImplementedError
148
149     @abc.abstractmethod
150     def _delete(self, nfocloudvres_id):
151         raise NotImplementedError