Add error handling for ocloud route
[pti/o2.git] / tests / unit / test_ocloud.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import uuid
16 from unittest.mock import MagicMock
17
18 from o2ims.domain import ocloud, subscription_obj
19 from o2ims.domain import resource_type as rt
20 from o2ims.views import ocloud_view
21 from o2common.config import config
22
23
24 def setup_ocloud():
25     ocloudid1 = str(uuid.uuid4())
26     ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1",
27                             config.get_api_url(), "ocloud for unit test", 1)
28     return ocloud1
29
30
31 def test_new_ocloud():
32     ocloudid1 = str(uuid.uuid4())
33     ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1",
34                             config.get_api_url(), "ocloud for unit test", 1)
35     assert ocloudid1 is not None and ocloud1.oCloudId == ocloudid1
36
37
38 # def test_add_ocloud_with_dms():
39 #     ocloud1 = setup_ocloud()
40 #     dmsid = str(uuid.uuid4())
41 #     dms = ocloud.DeploymentManager(
42 #         dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
43 #     ocloud1.addDeploymentManager(dms)
44 #     ocloud1.addDeploymentManager(dms)
45 #     assert len(ocloud1.deploymentManagers) == 1
46 #     # repo.update(ocloud1.oCloudId, {
47 #     #             "deploymentManagers": ocloud1.deploymentManagers})
48
49
50 def test_new_resource_type():
51     ocloud1 = setup_ocloud()
52     resource_type_id1 = str(uuid.uuid4())
53     resource_type1 = ocloud.ResourceType(
54         resource_type_id1, "resourcetype1", rt.ResourceTypeEnum.PSERVER,
55         ocloud1.oCloudId)
56     assert resource_type_id1 is not None and \
57         resource_type1.resourceTypeId == resource_type_id1
58
59
60 def test_new_resource_pool():
61     ocloud1 = setup_ocloud()
62     resource_pool_id1 = str(uuid.uuid4())
63     resource_pool1 = ocloud.ResourcePool(
64         resource_pool_id1, "resourcepool1", config.get_api_url(),
65         ocloud1.oCloudId)
66     assert resource_pool_id1 is not None and \
67         resource_pool1.resourcePoolId == resource_pool_id1
68
69
70 def test_new_resource():
71     resource_id1 = str(uuid.uuid4())
72     resource_type_id1 = str(uuid.uuid4())
73     resource_pool_id1 = str(uuid.uuid4())
74     resource1 = ocloud.Resource(
75         resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
76     assert resource_id1 is not None and resource1.resourceId == resource_id1
77
78
79 def test_new_deployment_manager():
80     ocloud_id1 = str(uuid.uuid4())
81     deployment_manager_id1 = str(uuid.uuid4())
82     deployment_manager1 = ocloud.DeploymentManager(
83         deployment_manager_id1, "k8s1", ocloud_id1,
84         config.get_api_url()+"/k8s1")
85     assert deployment_manager_id1 is not None and deployment_manager1.\
86         deploymentManagerId == deployment_manager_id1
87
88
89 def test_new_subscription():
90     subscription_id1 = str(uuid.uuid4())
91     subscription1 = subscription_obj.Subscription(
92         subscription_id1, "https://callback/uri/write/here")
93     assert subscription_id1 is not None and\
94         subscription1.subscriptionId == subscription_id1
95
96
97 def test_view_olcouds(mock_uow):
98     session, uow = mock_uow
99
100     ocloud1_UUID = str(uuid.uuid4)
101     ocloud1 = MagicMock()
102     ocloud1.serialize.return_value = {
103         'oCloudId': ocloud1_UUID, 'name': 'ocloud1'}
104     session.return_value.query.return_value = [ocloud1]
105
106     ocloud_list = ocloud_view.oclouds(uow)
107     # assert str(ocloud_list[0].get("oCloudId")) == ocloud1_UUID
108     assert len(ocloud_list) == 1
109
110
111 def test_view_olcoud_one(mock_uow):
112     session, uow = mock_uow
113
114     ocloud1_UUID = str(uuid.uuid4)
115     session.return_value.query.return_value.filter_by.return_value.first.\
116         return_value.serialize.return_value = None
117
118     # Query return None
119     ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
120     assert ocloud_res is None
121
122     session.return_value.query.return_value.filter_by.return_value.first.\
123         return_value.serialize.return_value = {
124             "oCloudId": ocloud1_UUID}
125
126     ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
127     assert str(ocloud_res.get("oCloudId")) == ocloud1_UUID
128
129
130 def test_view_resource_types(mock_uow):
131     session, uow = mock_uow
132
133     resource_type_id1 = str(uuid.uuid4())
134     restype1 = MagicMock()
135     restype1.serialize.return_value = {
136         "resourceTypeId": resource_type_id1}
137
138     order_by = MagicMock()
139     order_by.count.return_value = 1
140     order_by.limit.return_value.offset.return_value = [restype1]
141     session.return_value.query.return_value.filter.return_value.\
142         order_by.return_value = order_by
143
144     result = ocloud_view.resource_types(uow)
145     assert result['count'] == 1
146     ret_list = result['results']
147     assert str(ret_list[0].get("resourceTypeId")) == resource_type_id1
148
149
150 def test_view_resource_type_one(mock_uow):
151     session, uow = mock_uow
152
153     resource_type_id1 = str(uuid.uuid4())
154     session.return_value.query.return_value.filter_by.return_value.first.\
155         return_value.serialize.return_value = None
156
157     # Query return None
158     resource_type_res = ocloud_view.resource_type_one(
159         resource_type_id1, uow)
160     assert resource_type_res is None
161
162     session.return_value.query.return_value.filter_by.return_value.first.\
163         return_value.serialize.return_value = {
164             "resourceTypeId": resource_type_id1}
165
166     resource_type_res = ocloud_view.resource_type_one(resource_type_id1, uow)
167     assert str(resource_type_res.get("resourceTypeId")) == resource_type_id1
168
169
170 def test_view_resource_pools(mock_uow):
171     session, uow = mock_uow
172
173     resource_pool_id1 = str(uuid.uuid4())
174     respool1 = MagicMock()
175     respool1.serialize.return_value = {
176         "resourcePoolId": resource_pool_id1}
177
178     order_by = MagicMock()
179     order_by.count.return_value = 1
180     order_by.limit.return_value.offset.return_value = [respool1]
181     session.return_value.query.return_value.filter.return_value.\
182         order_by.return_value = order_by
183
184     result = ocloud_view.resource_pools(uow)
185     assert result['count'] == 1
186     ret_list = result['results']
187     assert str(ret_list[0].get("resourcePoolId")) == resource_pool_id1
188
189
190 def test_view_resource_pool_one(mock_uow):
191     session, uow = mock_uow
192
193     resource_pool_id1 = str(uuid.uuid4())
194     session.return_value.query.return_value.filter_by.return_value.first.\
195         return_value.serialize.return_value = None
196
197     # Query return None
198     resource_pool_res = ocloud_view.resource_pool_one(
199         resource_pool_id1, uow)
200     assert resource_pool_res is None
201
202     session.return_value.query.return_value.filter_by.return_value.first.\
203         return_value.serialize.return_value = {
204             "resourcePoolId": resource_pool_id1
205         }
206
207     resource_pool_res = ocloud_view.resource_pool_one(resource_pool_id1, uow)
208     assert str(resource_pool_res.get("resourcePoolId")) == resource_pool_id1
209
210
211 def test_view_resources(mock_uow):
212     session, uow = mock_uow
213
214     resource_id1 = str(uuid.uuid4())
215     resource_pool_id1 = str(uuid.uuid4())
216     res1 = MagicMock()
217     res1.serialize.return_value = {
218         "resourceId": resource_id1,
219         "resourcePoolId": resource_pool_id1
220     }
221
222     order_by = MagicMock()
223     order_by.count.return_value = 1
224     order_by.limit.return_value.offset.return_value = [res1]
225     session.return_value.query.return_value.filter.return_value.\
226         order_by.return_value = order_by
227     # TODO: workaround for sqlalchemy not mapping with resource object
228     setattr(ocloud.Resource, 'resourcePoolId', '')
229
230     result = ocloud_view.resources(resource_pool_id1, uow)
231     assert result['count'] == 1
232     resource_list = result['results']
233     assert str(resource_list[0].get("resourceId")) == resource_id1
234     assert str(resource_list[0].get("resourcePoolId")) == resource_pool_id1
235
236
237 def test_view_resource_one(mock_uow):
238     session, uow = mock_uow
239
240     resource_id1 = str(uuid.uuid4())
241     resource_pool_id1 = str(uuid.uuid4())
242     session.return_value.query.return_value.filter_by.return_value.first.\
243         return_value.serialize.return_value = None
244
245     # Query return None
246     resource_res = ocloud_view.resource_one(resource_id1, uow)
247     assert resource_res is None
248
249     session.return_value.query.return_value.filter_by.return_value.first.\
250         return_value.serialize.return_value = {
251             "resourceId": resource_id1,
252             "resourcePoolId": resource_pool_id1
253         }
254
255     resource_res = ocloud_view.resource_one(resource_id1, uow)
256     assert str(resource_res.get("resourceId")) == resource_id1
257
258
259 def test_view_deployment_managers(mock_uow):
260     session, uow = mock_uow
261
262     deployment_manager_id1 = str(uuid.uuid4())
263     dm1 = MagicMock()
264     dm1.serialize.return_value = {
265         "deploymentManagerId": deployment_manager_id1,
266     }
267
268     order_by = MagicMock()
269     order_by.count.return_value = 1
270     order_by.limit.return_value.offset.return_value = [dm1]
271     session.return_value.query.return_value.filter.return_value.\
272         order_by.return_value = order_by
273
274     result = ocloud_view.deployment_managers(uow)
275     assert result['count'] == 1
276     ret_list = result['results']
277     assert str(ret_list[0].get("deploymentManagerId")
278                ) == deployment_manager_id1
279
280
281 def test_view_deployment_manager_one(mock_uow):
282     session, uow = mock_uow
283
284     deployment_manager_id1 = str(uuid.uuid4())
285     session.return_value.query.return_value.filter_by.return_value.first.\
286         return_value.serialize.return_value = None
287
288     # Query return None
289     deployment_manager_res = ocloud_view.deployment_manager_one(
290         deployment_manager_id1, uow)
291     assert deployment_manager_res is None
292
293     dms_endpoint = "http://o2:30205/o2dms/v1/uuid"
294     session.return_value.query.return_value.filter_by.return_value.first.\
295         return_value.serialize.return_value = {
296             "deploymentManagerId": deployment_manager_id1,
297             "serviceUri": dms_endpoint,
298             "profile": {}
299         }
300
301     # profile default
302     deployment_manager_res = ocloud_view.deployment_manager_one(
303         deployment_manager_id1, uow)
304     assert str(deployment_manager_res.get(
305         "deploymentManagerId")) == deployment_manager_id1
306     assert str(deployment_manager_res.get(
307         'serviceUri')) == dms_endpoint
308     assert deployment_manager_res.get('profile') is None
309
310     # profile sol018
311     profileName = ocloud.DeploymentManagerProfileSOL018
312     cluster_endpoint = "https://test_k8s:6443"
313     session.return_value.query.return_value.filter_by.return_value.first.\
314         return_value.serialize.return_value['profile'] = {
315             "cluster_api_endpoint": cluster_endpoint
316         }
317     deployment_manager_res = ocloud_view.deployment_manager_one(
318         deployment_manager_id1, uow, profile=profileName)
319     assert str(deployment_manager_res.get(
320         'serviceUri')) == cluster_endpoint
321     assert str(deployment_manager_res.get(
322         "profileName")) == profileName
323
324     # profile wrong name
325     profileName = 'wrong_profile'
326     session.return_value.query.return_value.filter_by.return_value.first.\
327         return_value.serialize.return_value['profile'] = {
328             "cluster_api_endpoint": cluster_endpoint
329         }
330     deployment_manager_res = ocloud_view.deployment_manager_one(
331         deployment_manager_id1, uow, profile=profileName)
332     assert deployment_manager_res is None
333
334
335 def test_view_subscriptions(mock_uow):
336     session, uow = mock_uow
337
338     subscription_id1 = str(uuid.uuid4())
339     sub1 = MagicMock()
340     sub1.serialize.return_value = {
341         "subscriptionId": subscription_id1,
342     }
343
344     order_by = MagicMock()
345     order_by.count.return_value = 1
346     order_by.limit.return_value.offset.return_value = [sub1]
347     session.return_value.query.return_value.filter.return_value.\
348         order_by.return_value = order_by
349
350     result = ocloud_view.subscriptions(uow)
351     assert result['count'] == 1
352     ret_list = result['results']
353     assert str(ret_list[0].get("subscriptionId")) == subscription_id1
354
355
356 def test_view_subscription_one(mock_uow):
357     session, uow = mock_uow
358
359     subscription_id1 = str(uuid.uuid4())
360     session.return_value.query.return_value.filter_by.return_value.first.\
361         return_value.serialize.return_value = None
362
363     # Query return None
364     subscription_res = ocloud_view.subscription_one(
365         subscription_id1, uow)
366     assert subscription_res is None
367
368     session.return_value.query.return_value.filter_by.return_value.first.\
369         return_value.serialize.return_value = {
370             "subscriptionId": subscription_id1,
371         }
372
373     subscription_res = ocloud_view.subscription_one(
374         subscription_id1, uow)
375     assert str(subscription_res.get(
376         "subscriptionId")) == subscription_id1
377
378
379 def test_flask_get_list(mock_flask_uow):
380     session, app = mock_flask_uow
381     order_by = MagicMock()
382     order_by.count.return_value = 0
383     order_by.limit.return_value.offset.return_value = []
384     session.return_value.query.return_value.filter.return_value.\
385         order_by.return_value = order_by
386     apibase = config.get_o2ims_api_base() + '/v1'
387     # TODO: workaround for sqlalchemy not mapping with resource object
388     setattr(ocloud.Resource, 'resourcePoolId', '')
389
390     with app.test_client() as client:
391         # Get list and return empty list
392         ##########################
393         resp = client.get(apibase+"/resourceTypes")
394         assert resp.get_data() == b'[]\n'
395
396         resp = client.get(apibase+"/resourcePools")
397         assert resp.get_data() == b'[]\n'
398
399         resource_pool_id1 = str(uuid.uuid4())
400         resp = client.get(apibase+"/resourcePools/" +
401                           resource_pool_id1+"/resources")
402         assert resp.get_data() == b'[]\n'
403
404         resp = client.get(apibase+"/deploymentManagers")
405         assert resp.get_data() == b'[]\n'
406
407         resp = client.get(apibase+"/subscriptions")
408         assert resp.get_data() == b'[]\n'
409
410
411 def test_flask_get_one(mock_flask_uow):
412     session, app = mock_flask_uow
413
414     session.return_value.query.return_value.filter_by.return_value.\
415         first.return_value = None
416     apibase = config.get_o2ims_api_base() + '/v1'
417
418     with app.test_client() as client:
419         # Get one and return 404
420         ###########################
421         resp = client.get(apibase+"/")
422         assert resp.status_code == 404
423
424         resource_type_id1 = str(uuid.uuid4())
425         resp = client.get(apibase+"/resourceTypes/"+resource_type_id1)
426         assert resp.status_code == 404
427
428         resource_pool_id1 = str(uuid.uuid4())
429         resp = client.get(apibase+"/resourcePools/"+resource_pool_id1)
430         assert resp.status_code == 404
431
432         resource_id1 = str(uuid.uuid4())
433         resp = client.get(apibase+"/resourcePools/" +
434                           resource_pool_id1+"/resources/"+resource_id1)
435         assert resp.status_code == 404
436
437         deployment_manager_id1 = str(uuid.uuid4())
438         resp = client.get(apibase+"/deploymentManagers/" +
439                           deployment_manager_id1)
440         assert resp.status_code == 404
441
442         subscription_id1 = str(uuid.uuid4())
443         resp = client.get(apibase+"/subscriptions/"+subscription_id1)
444         assert resp.status_code == 404
445
446
447 def test_flask_post(mock_flask_uow):
448     session, app = mock_flask_uow
449     apibase = config.get_o2ims_api_base() + '/v1'
450
451     with app.test_client() as client:
452         session.return_value.execute.return_value = []
453
454         sub_callback = 'http://subscription/callback/url'
455         resp = client.post(apibase+'/subscriptions', json={
456             'callback': sub_callback,
457             'consumerSubscriptionId': 'consumerSubId1',
458             'filter': 'empty'
459         })
460         assert resp.status_code == 201
461         assert 'subscriptionId' in resp.get_json()
462
463
464 def test_flask_delete(mock_flask_uow):
465     session, app = mock_flask_uow
466     apibase = config.get_o2ims_api_base() + '/v1'
467
468     with app.test_client() as client:
469         session.return_value.execute.return_value.first.return_value = {}
470
471         subscription_id1 = str(uuid.uuid4())
472         resp = client.delete(apibase+"/subscriptions/"+subscription_id1)
473         assert resp.status_code == 204
474
475
476 def test_flask_not_allowed(mock_flask_uow):
477     _, app = mock_flask_uow
478     apibase = config.get_o2ims_api_base() + '/v1'
479
480     with app.test_client() as client:
481         # Testing resource type not support method
482         ##########################
483         uri = apibase + "/resourceTypes"
484         resp = client.post(uri)
485         assert resp.status == '405 METHOD NOT ALLOWED'
486         resp = client.put(uri)
487         assert resp.status == '405 METHOD NOT ALLOWED'
488         resp = client.patch(uri)
489         assert resp.status == '405 METHOD NOT ALLOWED'
490         resp = client.delete(uri)
491         assert resp.status == '405 METHOD NOT ALLOWED'
492
493         resource_type_id1 = str(uuid.uuid4())
494         uri = apibase + "/resourceTypes/" + resource_type_id1
495         resp = client.post(uri)
496         assert resp.status == '405 METHOD NOT ALLOWED'
497         resp = client.put(uri)
498         assert resp.status == '405 METHOD NOT ALLOWED'
499         resp = client.patch(uri)
500         assert resp.status == '405 METHOD NOT ALLOWED'
501         resp = client.delete(uri)
502         assert resp.status == '405 METHOD NOT ALLOWED'
503
504         # Testing resource pool not support method
505         ##########################
506         uri = apibase + "/resourcePools"
507         resp = client.post(uri)
508         assert resp.status == '405 METHOD NOT ALLOWED'
509         resp = client.put(uri)
510         assert resp.status == '405 METHOD NOT ALLOWED'
511         resp = client.patch(uri)
512         assert resp.status == '405 METHOD NOT ALLOWED'
513         resp = client.delete(uri)
514         assert resp.status == '405 METHOD NOT ALLOWED'
515
516         resource_pool_id1 = str(uuid.uuid4())
517         uri = apibase + "/resourcePools/" + resource_pool_id1
518         resp = client.post(uri)
519         assert resp.status == '405 METHOD NOT ALLOWED'
520         resp = client.put(uri)
521         assert resp.status == '405 METHOD NOT ALLOWED'
522         resp = client.patch(uri)
523         assert resp.status == '405 METHOD NOT ALLOWED'
524         resp = client.delete(uri)
525         assert resp.status == '405 METHOD NOT ALLOWED'
526
527         # Testing resource not support method
528         ##########################
529         uri = apibase + "/resourcePools/" + resource_pool_id1 + "/resources"
530         resp = client.post(uri)
531         assert resp.status == '405 METHOD NOT ALLOWED'
532         resp = client.put(uri)
533         assert resp.status == '405 METHOD NOT ALLOWED'
534         resp = client.patch(uri)
535         assert resp.status == '405 METHOD NOT ALLOWED'
536         resp = client.delete(uri)
537         assert resp.status == '405 METHOD NOT ALLOWED'
538
539         resource_id1 = str(uuid.uuid4())
540         uri = apibase + "/resourcePools/" + \
541             resource_pool_id1 + "/resources/" + resource_id1
542         resp = client.post(uri)
543         assert resp.status == '405 METHOD NOT ALLOWED'
544         resp = client.put(uri)
545         assert resp.status == '405 METHOD NOT ALLOWED'
546         resp = client.patch(uri)
547         assert resp.status == '405 METHOD NOT ALLOWED'
548         resp = client.delete(uri)
549         assert resp.status == '405 METHOD NOT ALLOWED'
550
551         # Testing deployment managers not support method
552         ##########################
553         uri = apibase + "/deploymentManagers"
554         resp = client.post(uri)
555         assert resp.status == '405 METHOD NOT ALLOWED'
556         resp = client.put(uri)
557         assert resp.status == '405 METHOD NOT ALLOWED'
558         resp = client.patch(uri)
559         assert resp.status == '405 METHOD NOT ALLOWED'
560         resp = client.delete(uri)
561         assert resp.status == '405 METHOD NOT ALLOWED'
562
563         deployment_manager_id1 = str(uuid.uuid4())
564         uri = apibase + "/deploymentManagers/" + deployment_manager_id1
565         resp = client.post(uri)
566         assert resp.status == '405 METHOD NOT ALLOWED'
567         resp = client.put(uri)
568         assert resp.status == '405 METHOD NOT ALLOWED'
569         resp = client.patch(uri)
570         assert resp.status == '405 METHOD NOT ALLOWED'
571         resp = client.delete(uri)
572         assert resp.status == '405 METHOD NOT ALLOWED'
573
574         # Testing subscriptions not support method
575         ##########################
576         uri = apibase + "/subscriptions"
577         resp = client.put(uri)
578         assert resp.status == '405 METHOD NOT ALLOWED'
579         resp = client.patch(uri)
580         assert resp.status == '405 METHOD NOT ALLOWED'
581         resp = client.delete(uri)
582         assert resp.status == '405 METHOD NOT ALLOWED'
583
584         subscription_id1 = str(uuid.uuid4())
585         uri = apibase + "/subscriptions/" + subscription_id1
586         resp = client.post(uri)
587         assert resp.status == '405 METHOD NOT ALLOWED'
588         resp = client.put(uri)
589         assert resp.status == '405 METHOD NOT ALLOWED'
590         resp = client.patch(uri)
591         assert resp.status == '405 METHOD NOT ALLOWED'