1 # Copyright (C) 2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from http import HTTPStatus
20 from requests import post as requests_post
21 from requests.auth import HTTPBasicAuth
22 from requests.exceptions import HTTPError
23 from jwt import decode as jwt_decode
24 from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
26 from o2common.authmw.exceptions import AuthRequiredExp
27 from o2common.authmw.exceptions import AuthFailureExp
28 from o2common.config.config import get_auth_provider, get_review_url
29 from o2common.config.config import get_reviewer_token
30 from o2common.config import conf
31 from o2common.helper import o2logging
33 ssl._create_default_https_context = ssl._create_unverified_context
34 logger = o2logging.get_logger(__name__)
37 class OAuthAuthenticationException(Exception):
38 def __init__(self, value):
42 class K8SAuthenticaException(Exception):
43 def __init__(self, value):
47 class K8SAuthorizationException(Exception):
48 def __init__(self, value):
54 def __init__(self, name):
57 # read the conf from config file
58 auth_prv_conf = get_auth_provider()
59 if auth_prv_conf == 'k8s':
60 self.obj = k8s_auth_provider('k8s')
62 self.obj = oauth2_auth_provider('oauth2')
65 return self.obj.tokenissue()
67 def sanity_check(self):
68 return self.obj.sanity_check()
70 def authenticate(self, token):
71 return self.obj.authenticate(token)
73 def __repr__(self) -> str:
74 return "<auth_definer: name = %s>" % self.name
77 class k8s_auth_provider(auth_definer):
79 def __init__(self, name):
82 self.token_review_url = get_review_url()
84 raise Exception('Failed to get k8s token review url.')
86 def tokenissue(self, **args2):
89 def sanity_check(self):
91 self.authenticate('faketoken')
92 except Exception as ex:
94 'Failed to bootstrap oauth middleware with exp: ' + str(ex))
95 raise Exception(str(ex))
97 def authenticate(self, token):
98 ''' Call Kubenetes API to authenticate '''
99 reviewer_token = get_reviewer_token()
101 "kind": "TokenReview",
102 "apiVersion": "authentication.k8s.io/v1",
104 "creationTimestamp": None
113 datas = json.dumps(tokenreview)
114 binary_data = datas.encode('utf-8')
116 header = {'Authorization': 'Bearer '+reviewer_token,
117 'Content-Type': 'application/json'}
119 req = urllib.request.Request(
120 self.token_review_url, data=binary_data, headers=header)
121 response = urllib.request.urlopen(req)
122 data = json.load(response)
123 if data['status']['authenticated'] is True:
124 logger.debug("Authenticated.")
126 except Exception as ex:
129 "Invoke K8s API Service Exception happened:" + strex)
131 raise K8SAuthorizationException(
132 'No privilege to perform oauth token check.')
134 raise K8SAuthenticaException(
135 'Self Authentication failure.')
138 def tokenrevoke(self, **args2):
142 class oauth2_auth_provider(auth_definer):
143 def __init__(self, name):
146 def _format_public_key(self):
147 public_key_string = """-----BEGIN PUBLIC KEY----- \
149 -----END PUBLIC KEY-----""" % conf.OAUTH2.oauth2_public_key
150 return public_key_string
152 def _verify_jwt_token_introspect(self, token):
153 introspect_endpoint = conf.OAUTH2.oauth2_introspection_endpoint
154 client_id = conf.OAUTH2.oauth2_client_id
155 client_secret = conf.OAUTH2.oauth2_client_secret
157 response = requests_post(
159 data={'token': token, 'client_id': client_id},
160 auth=HTTPBasicAuth(client_id, client_secret)
162 except HTTPError as e:
163 logger.error('OAuth2 jwt token introspect verify failed.')
164 raise Exception(str(e))
165 if response.status_code == HTTPStatus.OK:
166 introspection_data = response.json()
167 if introspection_data.get('active'):
168 logger.info('OAuth2 jwt token introspect result active.')
170 logger.info('OAuth2 jwt token introspect verify failed.')
173 def _verify_jwt_token(self, token):
174 algorithm = conf.OAUTH2.oauth2_algorithm
175 public_key_string = self._format_public_key()
177 options = {"verify_signature": True, "verify_aud": False,
179 decoded_token = jwt_decode(token, public_key_string,
180 algorithms=[algorithm], options=options)
182 'Verified Token from client: %s' %
183 decoded_token.get("clientHost"))
185 except (ExpiredSignatureError,
186 InvalidTokenError) as e:
187 logger.error(f'OAuth2 jwt token validation failed: {e}')
188 raise AuthFailureExp(
189 'OAuth2 JWT Token Authentication failure.')
190 except Exception as e:
191 raise AuthRequiredExp(str(e))
193 def authenticate(self, token):
194 ''' Call the JWT to authenticate
196 If the verify type is introspection, call introspection endpoint to
198 If the verify type is jwt, call JWT SDK to verify the token.
200 oauth2_verify_type = conf.OAUTH2.oauth2_verify_type
201 if oauth2_verify_type == 'introspection':
202 return self._verify_jwt_token_introspect(token)
203 elif oauth2_verify_type == 'jwt':
204 return self._verify_jwt_token(token)
207 def sanity_check(self):