1 # Copyright (C) 2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from o2common.helper import o2logging
21 from o2common.config.config import get_auth_provider, get_review_url
22 from o2common.config.config import get_reviewer_token
24 ssl._create_default_https_context = ssl._create_unverified_context
25 logger = o2logging.get_logger(__name__)
28 class K8SAuthenticaException(Exception):
29 def __init__(self, value):
33 class K8SAuthorizationException(Exception):
34 def __init__(self, value):
40 def __init__(self, name):
43 # read the conf from config file
44 auth_prv_conf = get_auth_provider()
45 if auth_prv_conf == 'k8s':
46 self.obj = k8s_auth_provider('k8s')
48 self.obj = keystone_auth_provider('keystone')
51 return self.obj.tokenissue()
53 def sanity_check(self):
54 return self.obj.sanity_check()
57 def authenticate(self, token):
58 return self.obj.authenticate(token)
60 def __repr__(self) -> str:
61 return "<auth_definer: name = %s>" % self.name
64 class k8s_auth_provider(auth_definer):
66 def __init__(self, name):
69 self.token_review_url = get_review_url()
71 raise Exception('Failed to get k8s token review url.')
73 def tokenissue(self, **args2):
76 def sanity_check(self):
78 self.authenticate('faketoken')
79 except Exception as ex:
81 'Failed to bootstrap oauth middleware with exp: ' + str(ex))
82 raise Exception(str(ex))
84 def authenticate(self, token):
85 reviewer_token = get_reviewer_token()
87 "kind": "TokenReview",
88 "apiVersion": "authentication.k8s.io/v1",
90 "creationTimestamp": None
99 datas = json.dumps(tokenreview)
100 binary_data = datas.encode('utf-8')
102 header = {'Authorization': 'Bearer '+reviewer_token,
103 'Content-Type': 'application/json'}
105 req = urllib.request.Request(
106 self.token_review_url, data=binary_data, headers=header)
107 response = urllib.request.urlopen(req)
108 data = json.load(response)
109 if data['status']['authenticated'] is True:
110 logger.debug("Authenticated.")
112 except Exception as ex:
115 "Invoke K8s API Service Exception happened:" + strex)
117 raise K8SAuthorizationException(
118 'No privilege to perform oauth token check.')
120 raise K8SAuthenticaException(
121 'Self Authentication failure.')
124 def tokenrevoke(self, **args2):
128 class keystone_auth_provider(auth_definer):
129 def __init__(self, name):
132 def tokenissue(self, *args1, **args2):
135 def authenticate(self, *args1, **args2):
138 def sanity_check(self):
141 def tokenrevoke(self, *args1, **args2):