+ def _format_public_key(self):
+ public_key_string = """-----BEGIN PUBLIC KEY----- \
+ %s \
+ -----END PUBLIC KEY-----""" % conf.OAUTH2.oauth2_public_key
+ return public_key_string
+
+ def _verify_jwt_token_introspect(self, token):
+ introspect_endpoint = conf.OAUTH2.oauth2_introspection_endpoint
+ client_id = conf.OAUTH2.oauth2_client_id
+ client_secret = conf.OAUTH2.oauth2_client_secret
+ try:
+ response = requests_post(
+ introspect_endpoint,
+ data={'token': token, 'client_id': client_id},
+ auth=HTTPBasicAuth(client_id, client_secret)
+ )
+ except HTTPError as e:
+ logger.error('OAuth2 jwt token introspect verify failed.')
+ raise Exception(str(e))
+ if response.status_code == HTTPStatus.OK:
+ introspection_data = response.json()
+ if introspection_data.get('active'):
+ logger.info('OAuth2 jwt token introspect result active.')
+ return True
+ logger.info('OAuth2 jwt token introspect verify failed.')
+ return False
+
+ def _verify_jwt_token(self, token):
+ algorithm = conf.OAUTH2.oauth2_algorithm
+ public_key_string = self._format_public_key()
+ try:
+ options = {"verify_signature": True, "verify_aud": False,
+ "exp": True}
+ decoded_token = jwt_decode(token, public_key_string,
+ algorithms=[algorithm], options=options)
+ logger.info(
+ 'Verified Token from client: %s' %
+ decoded_token.get("clientHost"))
+ return True
+ except (ExpiredSignatureError,
+ InvalidTokenError) as e:
+ logger.error(f'OAuth2 jwt token validation failed: {e}')
+ raise AuthFailureExp(
+ 'OAuth2 JWT Token Authentication failure.')
+ except Exception as e:
+ raise AuthRequiredExp(str(e))