+ def _verify_jwt_token_introspect(self, token):
+ introspect_endpoint = conf.OAUTH2.oauth2_introspection_endpoint
+ client_id = conf.OAUTH2.oauth2_client_id
+ client_secret = conf.OAUTH2.oauth2_client_secret
+ try:
+ response = requests_post(
+ introspect_endpoint,
+ data={'token': token, 'client_id': client_id},
+ auth=HTTPBasicAuth(client_id, client_secret)
+ )
+ except HTTPError as e:
+ logger.error('OAuth2 jwt token introspect verify failed.')
+ raise Exception(str(e))
+ if response.status_code == HTTPStatus.OK:
+ introspection_data = response.json()
+ if introspection_data.get('active'):
+ logger.info('OAuth2 jwt token introspect result active.')
+ return True
+ logger.info('OAuth2 jwt token introspect verify failed.')