# limitations under the License.
import ssl
-from o2common.helper import o2logging
import urllib.request
import urllib.parse
import json
-
+from http import HTTPStatus
+from requests import post as requests_post
+from requests.auth import HTTPBasicAuth
+from requests.exceptions import HTTPError
+from jwt import decode as jwt_decode
+from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
+
+from o2common.authmw.exceptions import AuthRequiredExp
+from o2common.authmw.exceptions import AuthFailureExp
from o2common.config.config import get_auth_provider, get_review_url
from o2common.config.config import get_reviewer_token
+from o2common.config import conf
+from o2common.helper import o2logging
ssl._create_default_https_context = ssl._create_unverified_context
logger = o2logging.get_logger(__name__)
-# read the conf from config file
-auth_prv_conf = get_auth_provider()
-try:
- token_review_url = get_review_url()
-except Exception:
- raise Exception('Get k8s token review url failed')
+class OAuthAuthenticationException(Exception):
+ def __init__(self, value):
+ self.value = value
class K8SAuthenticaException(Exception):
def __init__(self, name):
super().__init__()
self.name = name
+ # read the conf from config file
+ auth_prv_conf = get_auth_provider()
if auth_prv_conf == 'k8s':
self.obj = k8s_auth_provider('k8s')
else:
- self.obj = keystone_auth_provider('keystone')
+ self.obj = oauth2_auth_provider('oauth2')
def tokenissue(self):
return self.obj.tokenissue()
def sanity_check(self):
return self.obj.sanity_check()
- # call k8s api
def authenticate(self, token):
return self.obj.authenticate(token)
def __init__(self, name):
self.name = name
+ try:
+ self.token_review_url = get_review_url()
+ except Exception:
+ raise Exception('Failed to get k8s token review url.')
def tokenissue(self, **args2):
pass
raise Exception(str(ex))
def authenticate(self, token):
+ ''' Call Kubenetes API to authenticate '''
reviewer_token = get_reviewer_token()
tokenreview = {
"kind": "TokenReview",
'Content-Type': 'application/json'}
try:
req = urllib.request.Request(
- token_review_url, data=binary_data, headers=header)
+ self.token_review_url, data=binary_data, headers=header)
response = urllib.request.urlopen(req)
data = json.load(response)
if data['status']['authenticated'] is True:
return True
-class keystone_auth_provider(auth_definer):
+class oauth2_auth_provider(auth_definer):
def __init__(self, name):
self.name = name
- def tokenissue(self, *args1, **args2):
- pass
+ def _format_public_key(self):
+ public_key_string = """-----BEGIN PUBLIC KEY----- \
+ %s \
+ -----END PUBLIC KEY-----""" % conf.OAUTH2.oauth2_public_key
+ return public_key_string
+
+ def _verify_jwt_token_introspect(self, token):
+ introspect_endpoint = conf.OAUTH2.oauth2_introspection_endpoint
+ client_id = conf.OAUTH2.oauth2_client_id
+ client_secret = conf.OAUTH2.oauth2_client_secret
+ try:
+ response = requests_post(
+ introspect_endpoint,
+ data={'token': token, 'client_id': client_id},
+ auth=HTTPBasicAuth(client_id, client_secret)
+ )
+ except HTTPError as e:
+ logger.error('OAuth2 jwt token introspect verify failed.')
+ raise Exception(str(e))
+ if response.status_code == HTTPStatus.OK:
+ introspection_data = response.json()
+ if introspection_data.get('active'):
+ logger.info('OAuth2 jwt token introspect result active.')
+ return True
+ logger.info('OAuth2 jwt token introspect verify failed.')
+ return False
+
+ def _verify_jwt_token(self, token):
+ algorithm = conf.OAUTH2.oauth2_algorithm
+ public_key_string = self._format_public_key()
+ try:
+ options = {"verify_signature": True, "verify_aud": False,
+ "exp": True}
+ decoded_token = jwt_decode(token, public_key_string,
+ algorithms=[algorithm], options=options)
+ logger.info(
+ 'Verified Token from client: %s' %
+ decoded_token.get("clientHost"))
+ return True
+ except (ExpiredSignatureError,
+ InvalidTokenError) as e:
+ logger.error(f'OAuth2 jwt token validation failed: {e}')
+ raise AuthFailureExp(
+ 'OAuth2 JWT Token Authentication failure.')
+ except Exception as e:
+ raise AuthRequiredExp(str(e))
- def authenticate(self, *args1, **args2):
+ def authenticate(self, token):
+ ''' Call the JWT to authenticate
+
+ If the verify type is introspection, call introspection endpoint to
+ verify the token.
+ If the verify type is jwt, call JWT SDK to verify the token.
+ '''
+ oauth2_verify_type = conf.OAUTH2.oauth2_verify_type
+ if oauth2_verify_type == 'introspection':
+ return self._verify_jwt_token_introspect(token)
+ elif oauth2_verify_type == 'jwt':
+ return self._verify_jwt_token(token)
return False
def sanity_check(self):
pass
-
- def tokenrevoke(self, *args1, **args2):
- return False