1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from o2common.config import config
19 from o2ims.views import ocloud_view
20 from o2ims.domain import ocloud, subscription_obj
21 from o2ims.domain import resource_type as rt
24 pytestmark = pytest.mark.usefixtures("mappers")
28 ocloudid1 = str(uuid.uuid4())
29 ocloud1 = ocloud.Ocloud(
30 ocloudid1, "ocloud1", config.get_api_url(),
31 "ocloud 1 for integration test", 1)
35 def test_view_olcouds(sqlite_uow):
36 ocloud1 = setup_ocloud()
37 ocloud1_UUID = ocloud1.oCloudId
38 with sqlite_uow as uow:
39 uow.oclouds.add(ocloud1)
42 ocloud_list = ocloud_view.oclouds(uow)
43 assert str(ocloud_list[0].get("oCloudId")) == ocloud1_UUID
46 def test_view_olcoud_one(sqlite_uow):
47 ocloud1 = setup_ocloud()
48 ocloud1_UUID = ocloud1.oCloudId
51 ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, sqlite_uow)
52 assert ocloud_res is None
54 with sqlite_uow as uow:
55 uow.oclouds.add(ocloud1)
56 # INSERT INTO ocloud (oCloudId, name) VALUES (ocloud1_UUID, 'ocloud1')
58 ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
59 assert str(ocloud_res.get("oCloudId")) == ocloud1_UUID
62 def test_view_resource_types(sqlite_uow):
63 ocloud1 = setup_ocloud()
64 resource_type_id1 = str(uuid.uuid4())
65 resource_type1 = ocloud.ResourceType(
66 resource_type_id1, "resourcetype1", rt.ResourceTypeEnum.PSERVER,
68 with sqlite_uow as uow:
69 uow.resource_types.add(resource_type1)
72 resource_type_list = ocloud_view.resource_types(uow)
73 assert str(resource_type_list[0].get(
74 "resourceTypeId")) == resource_type_id1
77 def test_view_resource_type_one(sqlite_uow):
78 ocloud1 = setup_ocloud()
79 resource_type_id1 = str(uuid.uuid4())
80 resource_type1 = ocloud.ResourceType(
81 resource_type_id1, "resourcetype1", rt.ResourceTypeEnum.PSERVER,
85 resource_type_res = ocloud_view.resource_type_one(
86 resource_type_id1, sqlite_uow)
87 assert resource_type_res is None
89 with sqlite_uow as uow:
90 uow.resource_types.add(resource_type1)
92 resource_type_res = ocloud_view.resource_type_one(resource_type_id1, uow)
93 assert str(resource_type_res.get("resourceTypeId")) == resource_type_id1
96 def test_view_resource_pools(sqlite_uow):
97 ocloud1 = setup_ocloud()
98 resource_pool_id1 = str(uuid.uuid4())
99 resource_pool1 = ocloud.ResourcePool(
100 resource_pool_id1, "resourcepool1", config.get_api_url(),
102 with sqlite_uow as uow:
103 uow.resource_pools.add(resource_pool1)
106 resource_pool_list = ocloud_view.resource_pools(uow)
107 assert str(resource_pool_list[0].get(
108 "resourcePoolId")) == resource_pool_id1
111 def test_view_resource_pool_one(sqlite_uow):
112 ocloud1 = setup_ocloud()
113 resource_pool_id1 = str(uuid.uuid4())
114 resource_pool1 = ocloud.ResourcePool(
115 resource_pool_id1, "resourcepool1", config.get_api_url(),
119 resource_pool_res = ocloud_view.resource_pool_one(
120 resource_pool_id1, sqlite_uow)
121 assert resource_pool_res is None
123 with sqlite_uow as uow:
124 uow.resource_pools.add(resource_pool1)
126 resource_pool_res = ocloud_view.resource_pool_one(resource_pool_id1, uow)
127 assert str(resource_pool_res.get("resourcePoolId")) == resource_pool_id1
130 def test_view_resources(sqlite_uow):
131 resource_id1 = str(uuid.uuid4())
132 resource_type_id1 = str(uuid.uuid4())
133 resource_pool_id1 = str(uuid.uuid4())
134 resource1 = ocloud.Resource(
135 resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
136 with sqlite_uow as uow:
137 uow.resources.add(resource1)
140 resource_list = ocloud_view.resources(resource_pool_id1, uow)
141 assert str(resource_list[0].get("resourceId")) == resource_id1
144 def test_view_resource_one(sqlite_uow):
145 resource_id1 = str(uuid.uuid4())
146 resource_type_id1 = str(uuid.uuid4())
147 resource_pool_id1 = str(uuid.uuid4())
148 resource1 = ocloud.Resource(
149 resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
152 resource_res = ocloud_view.resource_one(resource_id1, sqlite_uow)
153 assert resource_res is None
155 with sqlite_uow as uow:
156 uow.resources.add(resource1)
159 resource_res = ocloud_view.resource_one(resource_id1, uow)
160 assert str(resource_res.get("resourceId")) == resource_id1
163 def test_view_deployment_managers(sqlite_uow):
164 ocloud_id1 = str(uuid.uuid4())
165 deployment_manager_id1 = str(uuid.uuid4())
166 deployment_manager1 = ocloud.DeploymentManager(
167 deployment_manager_id1, "k8s1", ocloud_id1,
168 config.get_api_url()+"/k8s1")
169 with sqlite_uow as uow:
170 uow.deployment_managers.add(deployment_manager1)
173 deployment_manager_list = ocloud_view.deployment_managers(uow)
174 assert str(deployment_manager_list[0].get(
175 "deploymentManagerId")) == deployment_manager_id1
178 def test_view_deployment_manager_one(sqlite_uow):
179 ocloud_id1 = str(uuid.uuid4())
180 deployment_manager_id1 = str(uuid.uuid4())
181 deployment_manager1 = ocloud.DeploymentManager(
182 deployment_manager_id1, "k8s1", ocloud_id1,
183 config.get_api_url()+"/k8s1")
186 deployment_manager_res = ocloud_view.deployment_manager_one(
187 deployment_manager_id1, sqlite_uow)
188 assert deployment_manager_res is None
190 with sqlite_uow as uow:
191 uow.deployment_managers.add(deployment_manager1)
194 deployment_manager_res = ocloud_view.deployment_manager_one(
195 deployment_manager_id1, sqlite_uow)
196 assert str(deployment_manager_res.get(
197 "deploymentManagerId")) == deployment_manager_id1
200 def test_view_subscriptions(sqlite_uow):
202 subscription_id1 = str(uuid.uuid4())
203 subscription1 = subscription_obj.Subscription(
204 subscription_id1, "https://callback/uri/write/here")
205 with sqlite_uow as uow:
206 uow.subscriptions.add(subscription1)
209 subscription_list = ocloud_view.subscriptions(uow)
210 assert str(subscription_list[0].get(
211 "subscriptionId")) == subscription_id1
214 def test_view_subscription_one(sqlite_uow):
216 subscription_id1 = str(uuid.uuid4())
217 subscription1 = subscription_obj.Subscription(
218 subscription_id1, "https://callback/uri/write/here")
221 subscription_res = ocloud_view.subscription_one(
222 subscription_id1, sqlite_uow)
223 assert subscription_res is None
225 with sqlite_uow as uow:
226 uow.subscriptions.add(subscription1)
229 subscription_res = ocloud_view.subscription_one(
230 subscription_id1, sqlite_uow)
231 assert str(subscription_res.get(
232 "subscriptionId")) == subscription_id1
235 def test_view_subscription_delete(sqlite_uow):
237 subscription_id1 = str(uuid.uuid4())
238 subscription1 = subscription_obj.Subscription(
239 subscription_id1, "https://callback/uri/write/here")
241 with sqlite_uow as uow:
242 uow.subscriptions.add(subscription1)
245 subscription_res = ocloud_view.subscription_one(
246 subscription_id1, sqlite_uow)
247 assert str(subscription_res.get(
248 "subscriptionId")) == subscription_id1
250 with sqlite_uow as uow:
251 uow.subscriptions.delete(subscription_id1)
254 subscription_res = ocloud_view.subscription_one(
255 subscription_id1, sqlite_uow)
256 assert subscription_res is None