Add oAuth2 for subscription and registration with SMO
[pti/o2.git] / tests / integration / test_common.py
1 # Copyright (C) 2021-2023 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14 import json
15 from urllib.parse import urlparse, urlunparse
16
17 from o2common.service.command.handler import SMOClient
18
19
20 def test_smo_with_oauth2():
21     # Replace these with actual values
22     client_id = 'client_id'
23     token_url = 'http://128.224.115.32:1080/mock_smo/v1/auth/token'
24     username = 'admin'
25     password = 'admin'
26     url = 'http://128.224.115.32:1080/mock_smo/v1/ocloud_observer'
27     data = {"key": "value"}
28
29     client = SMOClient(client_id=client_id, token_url=token_url,
30                        username=username, password=password,
31                        use_oauth=True)
32
33     # Fetch the token
34     client.fetch_token(client.session.verify)
35
36     # Make a POST request
37     response = client.post(url=url, data=json.dumps(data))
38
39     # Check the status code
40     assert response is True
41
42     # Check the response data if you expect any
43     # response_data = json.loads(response.text)
44     # assert response_data == expected_data
45
46     # --------------- HTTPS ---------------- #
47     parsed_token_url = urlparse(token_url)
48     parsed_token_url = parsed_token_url._replace(scheme='https')
49     token_url1 = urlunparse(parsed_token_url)
50
51     parsed_url = urlparse(url)
52     parsed_url = parsed_url._replace(scheme='https')
53     url1 = urlunparse(parsed_url)
54
55     client = SMOClient(client_id=client_id, token_url=token_url1,
56                        username=username, password=password,
57                        use_oauth=True)
58
59     # Fetch the token
60     client.fetch_token(client.session.verify)
61
62     # Make a POST request
63     response = client.post(url=url1, data=json.dumps(data))
64
65     # Check the status code
66     assert response is True
67
68
69 def test_smo_client():
70     url = 'http://128.224.115.32:1080/mock_smo/v1/o2ims_inventory_observer'
71     data = {"key": "value"}
72
73     client = SMOClient()
74
75     # Make a POST request
76     response = client.post(url=url, data=json.dumps(data))
77     # Check the status code
78     assert response is True
79
80     # Check the response data if you expect any
81     # response_data = json.loads(response.text)
82     # assert response_data == expected_data
83
84     parsed_url = urlparse(url)
85     parsed_url = parsed_url._replace(scheme='https')
86     url1 = urlunparse(parsed_url)
87
88     # Make a POST request
89     response = client.post(url=url1, data=json.dumps(data))
90     # Check the status code
91     assert response is True