1 # Copyright (C) 2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from requests_oauthlib import OAuth2Session
20 from oauthlib.oauth2 import LegacyApplicationClient
21 from requests.packages.urllib3.util.retry import Retry
22 from requests.adapters import HTTPAdapter
23 from requests.exceptions import RequestException, SSLError
25 from o2common.helper import o2logging
26 from o2common.config import config
28 logger = o2logging.get_logger(__name__)
31 def get_http_conn(callbackurl):
32 conn = http.client.HTTPConnection(callbackurl)
37 def get_https_conn_default(callbackurl):
38 sslctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
39 sslctx.check_hostname = True
40 sslctx.verify_mode = ssl.CERT_REQUIRED
41 sslctx.load_default_certs()
42 conn = http.client.HTTPSConnection(callbackurl, context=sslctx)
47 def get_https_conn_selfsigned(callbackurl):
48 sslctx = ssl.create_default_context(
49 purpose=ssl.Purpose.SERVER_AUTH)
50 smo_ca_path = config.get_smo_ca_config_path()
51 sslctx.load_verify_locations(smo_ca_path)
52 sslctx.check_hostname = False
53 sslctx.verify_mode = ssl.CERT_REQUIRED
54 conn = http.client.HTTPSConnection(callbackurl, context=sslctx)
59 def __init__(self, client_id=None, token_url=None, username=None,
60 password=None, scope=None, retries=3, use_oauth=False):
61 self.client_id = client_id
62 self.token_url = token_url
63 self.username = username
64 self.password = password
65 self.scope = scope if scope else []
66 self.use_oauth = use_oauth
67 self.retries = retries
70 if not all([self.client_id, self.token_url, self.username,
73 'client_id, token_url, username, and password ' +
74 'must be provided when use_oauth is True.')
76 # Set OAUTHLIB_INSECURE_TRANSPORT environment variable
77 # if token_url uses http
78 if 'http://' in self.token_url:
79 os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
81 # Create a LegacyApplicationClient for handling password flow
82 client = LegacyApplicationClient(client_id=self.client_id)
83 self.session = OAuth2Session(client=client)
85 # Check if token_url uses https and set SSL verification
86 if 'https://' in self.token_url:
87 ca_path = config.get_smo_ca_config_path()
88 if os.path.exists(ca_path):
89 self.session.verify = ca_path
91 self.session.verify = True
93 # Fetch the access token
94 self.fetch_token(self.session.verify)
96 self.session = requests.Session()
98 # Create a Retry object for handling retries
99 retry_strategy = Retry(
102 status_forcelist=[429, 500, 502, 503, 504],
103 allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
105 adapter = HTTPAdapter(max_retries=retry_strategy)
106 self.session.mount("https://", adapter)
107 self.session.mount("http://", adapter)
109 def fetch_token(self, verify):
111 self.session.fetch_token(
112 token_url=self.token_url,
113 username=self.username,
114 password=self.password,
115 client_id=self.client_id,
119 # If SSLError is raised, try again with verify=False
120 logger.warning('The SSLError occurred')
121 if verify is not False:
122 self.fetch_token(verify=False)
124 def handle_post_data(self, resp):
125 if resp.status_code >= 200 and resp.status_code < 300:
127 logger.error('Response code is: {}'.format(resp.status_code))
128 # TODO: write the status to extension db table.
131 def post(self, url, data, retries=1):
132 if not all([url, data]):
134 'url, data must be provided when call the post.')
136 # Check if token_url uses https and set SSL verification
137 if 'https://' in url:
138 ca_path = config.get_smo_ca_config_path()
139 if os.path.exists(ca_path):
140 self.session.verify = ca_path
142 self.session.verify = True
145 retries = self.retries
147 for _ in range(retries):
149 response = self.session.post(url, json=data)
150 response.raise_for_status()
151 return self.handle_post_data(response)
152 except (SSLError, RequestException) as e:
153 logger.warning(f'Error occurred: {e}')
156 f"POST request to {url} failed after {retries} retries.")