Merge "Update chart to support the deployment without persistent database"
[pti/o2.git] / o2common / authmw / authprov.py
1 # Copyright (C) 2022 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import ssl
16 from o2common.helper import o2logging
17 import urllib.request
18 import urllib.parse
19 import json
20
21 from o2common.config.config import get_auth_provider, get_review_url
22 from o2common.config.config import get_reviewer_token
23
24 ssl._create_default_https_context = ssl._create_unverified_context
25 logger = o2logging.get_logger(__name__)
26
27
28 class K8SAuthenticaException(Exception):
29     def __init__(self, value):
30         self.value = value
31
32
33 class K8SAuthorizationException(Exception):
34     def __init__(self, value):
35         self.value = value
36
37
38 class auth_definer():
39
40     def __init__(self, name):
41         super().__init__()
42         self.name = name
43         # read the conf from config file
44         auth_prv_conf = get_auth_provider()
45         if auth_prv_conf == 'k8s':
46             self.obj = k8s_auth_provider('k8s')
47         else:
48             self.obj = keystone_auth_provider('keystone')
49
50     def tokenissue(self):
51         return self.obj.tokenissue()
52
53     def sanity_check(self):
54         return self.obj.sanity_check()
55
56     # call k8s api
57     def authenticate(self, token):
58         return self.obj.authenticate(token)
59
60     def __repr__(self) -> str:
61         return "<auth_definer: name = %s>" % self.name
62
63
64 class k8s_auth_provider(auth_definer):
65
66     def __init__(self, name):
67         self.name = name
68         try:
69             self.token_review_url = get_review_url()
70         except Exception:
71             raise Exception('Failed to get k8s token review url.')
72
73     def tokenissue(self, **args2):
74         pass
75
76     def sanity_check(self):
77         try:
78             self.authenticate('faketoken')
79         except Exception as ex:
80             logger.critical(
81                 'Failed to bootstrap oauth middleware with exp: ' + str(ex))
82             raise Exception(str(ex))
83
84     def authenticate(self, token):
85         reviewer_token = get_reviewer_token()
86         tokenreview = {
87             "kind": "TokenReview",
88             "apiVersion": "authentication.k8s.io/v1",
89             "metadata": {
90                 "creationTimestamp": None
91             },
92             "spec": {
93                 "token": ""+token
94             },
95             "status": {
96                 "user": {}
97             }
98         }
99         datas = json.dumps(tokenreview)
100         binary_data = datas.encode('utf-8')
101         # 'post' method
102         header = {'Authorization': 'Bearer '+reviewer_token,
103                   'Content-Type': 'application/json'}
104         try:
105             req = urllib.request.Request(
106                 self.token_review_url, data=binary_data, headers=header)
107             response = urllib.request.urlopen(req)
108             data = json.load(response)
109             if data['status']['authenticated'] is True:
110                 logger.debug("Authenticated.")
111                 return True
112         except Exception as ex:
113             strex = str(ex)
114             logger.warning(
115                 "Invoke K8s API Service Exception happened:" + strex)
116             if '403' in strex:
117                 raise K8SAuthorizationException(
118                     'No privilege to perform oauth token check.')
119             elif '401' in strex:
120                 raise K8SAuthenticaException(
121                     'Self Authentication failure.')
122         return False
123
124     def tokenrevoke(self, **args2):
125         return True
126
127
128 class keystone_auth_provider(auth_definer):
129     def __init__(self, name):
130         self.name = name
131
132     def tokenissue(self, *args1, **args2):
133         pass
134
135     def authenticate(self, *args1, **args2):
136         return False
137
138     def sanity_check(self):
139         pass
140
141     def tokenrevoke(self, *args1, **args2):
142         return False