Fix distributed cloud sync the wrong label cloud; update the werkzeug version
[pti/o2.git] / o2common / authmw / authprov.py
1 # Copyright (C) 2022 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import ssl
16 from o2common.helper import o2logging
17 import urllib.request
18 import urllib.parse
19 import json
20
21 from o2common.config.config import get_auth_provider, get_review_url
22 from o2common.config.config import get_reviewer_token
23
24 ssl._create_default_https_context = ssl._create_unverified_context
25 logger = o2logging.get_logger(__name__)
26
27 # read the conf from config file
28 auth_prv_conf = get_auth_provider()
29
30 try:
31     token_review_url = get_review_url()
32 except Exception:
33     raise Exception('Get k8s token review url failed')
34
35
36 class K8SAuthenticaException(Exception):
37     def __init__(self, value):
38         self.value = value
39
40
41 class K8SAuthorizationException(Exception):
42     def __init__(self, value):
43         self.value = value
44
45
46 class auth_definer():
47
48     def __init__(self, name):
49         super().__init__()
50         self.name = name
51         if auth_prv_conf == 'k8s':
52             self.obj = k8s_auth_provider('k8s')
53         else:
54             self.obj = keystone_auth_provider('keystone')
55
56     def tokenissue(self):
57         return self.obj.tokenissue()
58
59     def sanity_check(self):
60         return self.obj.sanity_check()
61
62     # call k8s api
63     def authenticate(self, token):
64         return self.obj.authenticate(token)
65
66     def __repr__(self) -> str:
67         return "<auth_definer: name = %s>" % self.name
68
69
70 class k8s_auth_provider(auth_definer):
71
72     def __init__(self, name):
73         self.name = name
74
75     def tokenissue(self, **args2):
76         pass
77
78     def sanity_check(self):
79         try:
80             self.authenticate('faketoken')
81         except Exception as ex:
82             logger.critical(
83                 'Failed to bootstrap oauth middleware with exp: ' + str(ex))
84             raise Exception(str(ex))
85
86     def authenticate(self, token):
87         reviewer_token = get_reviewer_token()
88         tokenreview = {
89             "kind": "TokenReview",
90             "apiVersion": "authentication.k8s.io/v1",
91             "metadata": {
92                 "creationTimestamp": None
93             },
94             "spec": {
95                 "token": ""+token
96             },
97             "status": {
98                 "user": {}
99             }
100         }
101         datas = json.dumps(tokenreview)
102         binary_data = datas.encode('utf-8')
103         # 'post' method
104         header = {'Authorization': 'Bearer '+reviewer_token,
105                   'Content-Type': 'application/json'}
106         try:
107             req = urllib.request.Request(
108                 token_review_url, data=binary_data, headers=header)
109             response = urllib.request.urlopen(req)
110             data = json.load(response)
111             if data['status']['authenticated'] is True:
112                 logger.debug("Authenticated.")
113                 return True
114         except Exception as ex:
115             strex = str(ex)
116             logger.warning(
117                 "Invoke K8s API Service Exception happened:" + strex)
118             if '403' in strex:
119                 raise K8SAuthorizationException(
120                     'No privilege to perform oauth token check.')
121             elif '401' in strex:
122                 raise K8SAuthenticaException(
123                     'Self Authentication failure.')
124         return False
125
126     def tokenrevoke(self, **args2):
127         return True
128
129
130 class keystone_auth_provider(auth_definer):
131     def __init__(self, name):
132         self.name = name
133
134     def tokenissue(self, *args1, **args2):
135         pass
136
137     def authenticate(self, *args1, **args2):
138         return False
139
140     def sanity_check(self):
141         pass
142
143     def tokenrevoke(self, *args1, **args2):
144         return False