1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from typing import List
16 from o2dms.domain import dms, dms_repo
19 class NfDeploymentDescSqlAlchemyRepository(dms_repo
20 .NfDeploymentDescRepository):
22 def __init__(self, session):
24 self.session = session
26 def _add(self, nfdeployment_desc: dms.NfDeploymentDesc):
27 self.session.add(nfdeployment_desc)
29 def _get(self, nfdeployment_desc_id) -> dms.NfDeploymentDesc:
30 return self.session.query(dms.NfDeploymentDesc).filter_by(
31 id=nfdeployment_desc_id).first()
33 def _list(self) -> List[dms.NfDeploymentDesc]:
34 return self.session.query()
36 def _update(self, nfdeployment_desc_id, **kwargs):
37 self.session.query(dms.NfDeploymentDesc).filter_by(
38 id=nfdeployment_desc_id).update(**kwargs)
40 def _delete(self, nfdeployment_desc_id):
41 self.session.query(dms.NfDeploymentDesc).filter_by(
42 id=nfdeployment_desc_id
45 def _count(self, **kwargs):
46 return self.session.query(
47 dms.NfDeploymentDesc).filter_by(**kwargs).count()
50 class NfDeploymentSqlAlchemyRepository(
51 dms_repo.NfDeploymentRepository):
53 def __init__(self, session):
55 self.session = session
57 def _add(self, nfdeployment: dms.NfDeployment):
58 self.session.add(nfdeployment)
60 def _get(self, nfdeployment_id) -> dms.NfDeployment:
61 return self.session.query(dms.NfDeployment).filter_by(
62 id=nfdeployment_id).first()
64 def _list(self) -> List[dms.NfDeployment]:
65 return self.session.query()
67 def _update(self, nfdeployment_id, **kwargs):
68 self.session.query(dms.NfDeployment).filter_by(
69 id=nfdeployment_id).update(**kwargs)
71 def _delete(self, nfdeployment_id):
72 self.session.query(dms.NfDeployment).filter_by(
76 def _count(self, **kwargs):
77 return self.session.query(
78 dms.NfDeployment).filter_by(**kwargs).count()
81 class NfOCloudVResourceSqlAlchemyRepository(
82 dms_repo.NfOCloudVResourceRepository):
84 def __init__(self, session):
86 self.session = session
88 def _add(self, nfocloudvres: dms.NfOCloudVResource):
89 self.session.add(nfocloudvres)
91 def _get(self, nfocloudvres_id) -> dms.NfOCloudVResource:
92 return self.session.query(dms.NfOCloudVResource).filter_by(
93 id=nfocloudvres_id).first()
95 def _list(self) -> List[dms.NfOCloudVResource]:
96 return self.session.query()
98 def _update(self, nfocloudvres, **kwargs):
99 self.session.query(dms.NfOCloudVResource).filter_by(
100 id=nfocloudvres).update(**kwargs)
102 def _delete(self, nfocloudvres):
103 self.session.query(dms.NfOCloudVResource).filter_by(