Fix the wrong format of registration and subscription with SMO
[pti/o2.git] / o2common / service / command / handler.py
1 # Copyright (C) 2022 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import os
16 import requests
17 import http.client
18 import ssl
19 from requests_oauthlib import OAuth2Session
20 from oauthlib.oauth2 import LegacyApplicationClient
21 from requests.packages.urllib3.util.retry import Retry
22 from requests.adapters import HTTPAdapter
23 from requests.exceptions import RequestException, SSLError
24
25 from o2common.helper import o2logging
26 from o2common.config import config
27
28 logger = o2logging.get_logger(__name__)
29
30
31 def get_http_conn(callbackurl):
32     conn = http.client.HTTPConnection(callbackurl)
33     return conn
34
35
36 # with default CA
37 def get_https_conn_default(callbackurl):
38     sslctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
39     sslctx.check_hostname = True
40     sslctx.verify_mode = ssl.CERT_REQUIRED
41     sslctx.load_default_certs()
42     conn = http.client.HTTPSConnection(callbackurl, context=sslctx)
43     return conn
44
45
46 # with self signed ca
47 def get_https_conn_selfsigned(callbackurl):
48     sslctx = ssl.create_default_context(
49         purpose=ssl.Purpose.SERVER_AUTH)
50     smo_ca_path = config.get_smo_ca_config_path()
51     sslctx.load_verify_locations(smo_ca_path)
52     sslctx.check_hostname = False
53     sslctx.verify_mode = ssl.CERT_REQUIRED
54     conn = http.client.HTTPSConnection(callbackurl, context=sslctx)
55     return conn
56
57
58 class SMOClient:
59     def __init__(self, client_id=None, token_url=None, username=None,
60                  password=None, scope=None, retries=3, use_oauth=False):
61         self.client_id = client_id
62         self.token_url = token_url
63         self.username = username
64         self.password = password
65         self.scope = scope if scope else []
66         self.use_oauth = use_oauth
67         self.retries = retries
68
69         if self.use_oauth:
70             if not all([self.client_id, self.token_url, self.username,
71                         self.password]):
72                 raise ValueError(
73                     'client_id, token_url, username, and password ' +
74                     'must be provided when use_oauth is True.')
75
76             # Set OAUTHLIB_INSECURE_TRANSPORT environment variable
77             # if token_url uses http
78             if 'http://' in self.token_url:
79                 os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
80
81             # Create a LegacyApplicationClient for handling password flow
82             client = LegacyApplicationClient(client_id=self.client_id)
83             self.session = OAuth2Session(client=client)
84
85             # Check if token_url uses https and set SSL verification
86             if 'https://' in self.token_url:
87                 ca_path = config.get_smo_ca_config_path()
88                 if os.path.exists(ca_path):
89                     self.session.verify = ca_path
90                 else:
91                     self.session.verify = True
92
93             # Fetch the access token
94             self.fetch_token(self.session.verify)
95         else:
96             self.session = requests.Session()
97
98         # Create a Retry object for handling retries
99         retry_strategy = Retry(
100             total=retries,
101             backoff_factor=1,
102             status_forcelist=[429, 500, 502, 503, 504],
103             allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
104         )
105         adapter = HTTPAdapter(max_retries=retry_strategy)
106         self.session.mount("https://", adapter)
107         self.session.mount("http://", adapter)
108
109     def fetch_token(self, verify):
110         try:
111             self.session.fetch_token(
112                 token_url=self.token_url,
113                 username=self.username,
114                 password=self.password,
115                 client_id=self.client_id,
116                 verify=verify
117             )
118         except SSLError:
119             # If SSLError is raised, try again with verify=False
120             logger.warning('The SSLError occurred')
121             if verify is not False:
122                 self.fetch_token(verify=False)
123
124     def handle_post_data(self, resp):
125         if resp.status_code >= 200 and resp.status_code < 300:
126             return True
127         logger.error('Response code is: {}'.format(resp.status_code))
128         # TODO: write the status to extension db table.
129         return False
130
131     def post(self, url, data, retries=1):
132         if not all([url, data]):
133             raise ValueError(
134                 'url, data must be provided when call the post.')
135
136         # Check if token_url uses https and set SSL verification
137         if 'https://' in url:
138             ca_path = config.get_smo_ca_config_path()
139             if os.path.exists(ca_path):
140                 self.session.verify = ca_path
141             else:
142                 self.session.verify = True
143
144         if retries is None:
145             retries = self.retries
146
147         for _ in range(retries):
148             try:
149                 response = self.session.post(url, json=data)
150                 response.raise_for_status()
151                 return self.handle_post_data(response)
152             except (SSLError, RequestException) as e:
153                 logger.warning(f'Error occurred: {e}')
154                 pass
155         raise Exception(
156             f"POST request to {url} failed after {retries} retries.")