1 # Copyright (C) 2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from o2common.helper import o2logging
21 from o2common.config.config import get_auth_provider, get_review_url
22 from o2common.config.config import get_reviewer_token
24 ssl._create_default_https_context = ssl._create_unverified_context
25 logger = o2logging.get_logger(__name__)
27 # read the conf from config file
28 auth_prv_conf = get_auth_provider()
31 token_review_url = get_review_url()
33 raise Exception('Get k8s token review url failed')
36 class K8SAuthenticaException(Exception):
37 def __init__(self, value):
41 class K8SAuthorizationException(Exception):
42 def __init__(self, value):
48 def __init__(self, name):
51 if auth_prv_conf == 'k8s':
52 self.obj = k8s_auth_provider('k8s')
54 self.obj = keystone_auth_provider('keystone')
57 return self.obj.tokenissue()
59 def sanity_check(self):
60 return self.obj.sanity_check()
63 def authenticate(self, token):
64 return self.obj.authenticate(token)
66 def __repr__(self) -> str:
67 return "<auth_definer: name = %s>" % self.name
70 class k8s_auth_provider(auth_definer):
72 def __init__(self, name):
75 def tokenissue(self, **args2):
78 def sanity_check(self):
80 self.authenticate('faketoken')
81 except Exception as ex:
83 'Failed to bootstrap oauth middleware with exp: ' + str(ex))
84 raise Exception(str(ex))
86 def authenticate(self, token):
87 reviewer_token = get_reviewer_token()
89 "kind": "TokenReview",
90 "apiVersion": "authentication.k8s.io/v1",
92 "creationTimestamp": None
101 datas = json.dumps(tokenreview)
102 binary_data = datas.encode('utf-8')
104 header = {'Authorization': 'Bearer '+reviewer_token,
105 'Content-Type': 'application/json'}
107 req = urllib.request.Request(
108 token_review_url, data=binary_data, headers=header)
109 response = urllib.request.urlopen(req)
110 data = json.load(response)
111 if data['status']['authenticated'] is True:
112 logger.debug("Authenticated.")
114 except Exception as ex:
117 "Invoke K8s API Service Exception happened:" + strex)
119 raise K8SAuthorizationException(
120 'No privilege to perform oauth token check.')
122 raise K8SAuthenticaException(
123 'Self Authentication failure.')
126 def tokenrevoke(self, **args2):
130 class keystone_auth_provider(auth_definer):
131 def __init__(self, name):
134 def tokenissue(self, *args1, **args2):
137 def authenticate(self, *args1, **args2):
140 def sanity_check(self):
143 def tokenrevoke(self, *args1, **args2):