Merge "OAuth2 support"
[pti/o2.git] / o2common / config / config.py
1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import os
16 import sys
17 import ipaddress
18 from urllib.parse import urlparse
19
20 from o2common import config
21 from o2common.helper import o2logging
22 logger = o2logging.get_logger(__name__)
23
24
25 CGTS_INSECURE_SSL = os.environ.get("CGTS_INSECURE_SSL", "0") == "1"
26
27 _DEFAULT_STX_URL = "http://192.168.204.1:5000/v3"
28 _DCMANAGER_URL_PORT = os.environ.get("DCMANAGER_API_PORT", "8119")
29 _DCMANAGER_URL_PATH = os.environ.get("DCMANAGER_API_PATH", "/v1.0")
30
31
32 def get_config_path():
33     path = os.environ.get("O2APP_CONFIG", "/configs/o2app.conf")
34     return path
35
36
37 def get_smo_ca_config_path():
38     path = os.environ.get("SMO_CA_CONFIG", "/configs/smoca.crt")
39     return path
40
41
42 def get_postgres_uri():
43     host = os.environ.get("DB_HOST", "localhost")
44     port = int(os.environ.get("DB_PORT", 5432))
45     password = os.environ.get("DB_PASSWORD", "o2ims123")
46     user, db_name = "o2ims", "o2ims"
47     return f"postgresql://{user}:{password}@{host}:{port}/{db_name}"
48
49
50 def get_api_url():
51     host_interal = os.environ.get("API_HOST", "localhost")
52     host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
53     if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
54             config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
55         host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
56     host = host_interal if host_external is None or host_external == '' \
57         else host_external
58
59     port_internal = 5005 if host == "localhost" else 80
60     port_external = 30205
61     port = port_internal if host_external is None or host_external == '' \
62         else port_external
63     return f"https://{host}:{port}"
64
65
66 def get_stx_url():
67     try:
68         return get_stx_client_args()["auth_url"]
69     except KeyError:
70         logger.error('Please source your RC file before execution, '
71                      'e.g.: `source ~/downloads/admin-rc.sh`')
72         sys.exit(1)
73
74
75 def get_dc_manager_url():
76     auth_url = os.environ.get("DCMANAGER_OS_AUTH_URL", None)
77     if auth_url is None:
78         temp_url = get_stx_url()
79         u = urlparse(temp_url)
80         u = u._replace(netloc=f"{u.hostname}:{_DCMANAGER_URL_PORT}")
81         u = u._replace(path=_DCMANAGER_URL_PATH)
82         auth_url = u.geturl()
83     return auth_url
84
85
86 def get_root_api_base():
87     return "/"
88
89
90 def get_o2ims_api_base():
91     return get_root_api_base() + 'o2ims-infrastructureInventory'
92
93
94 def get_o2ims_monitoring_api_v1():
95     return '/v1'
96
97
98 def get_o2ims_inventory_api_v1():
99     return '/v1'
100
101
102 def get_o2ims_monitoring_api_base():
103     return get_root_api_base() + 'o2ims-infrastructureMonitoring'
104
105
106 def get_o2dms_api_base():
107     return get_root_api_base() + "o2dms/v1"
108
109
110 def get_redis_host_and_port():
111     host = os.environ.get("REDIS_HOST", "localhost")
112     port = int(os.environ.get("REDIS_PORT", 6379))
113     return dict(host=host, port=port)
114
115
116 def get_smo_o2endpoint():
117     smo_o2endpoint = os.environ.get(
118         "SMO_O2_ENDPOINT", "http://localhost/smo_sim")
119     return smo_o2endpoint
120
121
122 def get_stx_client_args():
123     client_args = dict(
124         auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
125         username=os.environ.get('OS_USERNAME', "admin"),
126         api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
127         project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
128     )
129     if config.conf.OCLOUD.OS_AUTH_URL is not None and \
130             config.conf.OCLOUD.OS_AUTH_URL != '':
131         client_args['auth_url'] = config.conf.OCLOUD.OS_AUTH_URL
132     if config.conf.OCLOUD.OS_USERNAME is not None and \
133             config.conf.OCLOUD.OS_USERNAME != '':
134         client_args['username'] = config.conf.OCLOUD.OS_USERNAME
135     if config.conf.OCLOUD.OS_PASSWORD is not None and \
136             config.conf.OCLOUD.OS_PASSWORD != '':
137         client_args['api_key'] = config.conf.OCLOUD.OS_PASSWORD
138     if config.conf.OCLOUD.OS_PROJECT_NAME is not None and \
139             config.conf.OCLOUD.OS_PROJECT_NAME != '':
140         client_args['project_name'] = config.conf.OCLOUD.OS_PROJECT_NAME
141     return client_args
142
143
144 def is_ipv6(address):
145     try:
146         # Try to convert the address and check the IP version
147         ip = ipaddress.ip_address(address)
148         return ip.version == 6
149     except ValueError:
150         return False
151
152
153 def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "",
154                         sub_is_https: bool = False):
155     try:
156         client_args = get_stx_client_args()
157     except KeyError:
158         logger.error('Please source your RC file before execution, '
159                      'e.g.: `source ~/downloads/admin-rc.sh`')
160         sys.exit(1)
161
162     os_client_args = {}
163     for key, val in client_args.items():
164         os_client_args['os_{key}'.format(key=key)] = val
165     if "" != subcloud_hostname:
166         if is_ipv6(subcloud_hostname):
167             subcloud_hostname = "[" + subcloud_hostname + "]"
168         orig_auth_url = urlparse(get_stx_url())
169         new_auth_url = orig_auth_url._replace(
170             netloc=orig_auth_url.netloc.replace(
171                 orig_auth_url.hostname, subcloud_hostname))
172         # new_auth_url = new_auth_url._replace(
173         #     netloc=new_auth_url.netloc.replace(str(new_auth_url.port),
174         # "18002"))
175         if sub_is_https:
176             new_auth_url = new_auth_url._replace(
177                 scheme=new_auth_url.scheme.
178                 replace(new_auth_url.scheme, 'https'))
179             os_client_args['insecure'] = CGTS_INSECURE_SSL
180         os_client_args['os_auth_url'] = new_auth_url.geturl()
181         os_client_args['os_endpoint_type'] = 'public'
182     # os_client_args['system_url'] = os_client_args['os_auth_url']
183     os_client_args['os_password'] = os_client_args.pop('os_api_key')
184     os_client_args['os_region_name'] = region_name
185     os_client_args['api_version'] = 1
186     # os_client_args['user_domain_name'] = 'Default'
187     # os_client_args['project_domain_name'] = 'Default'
188     return os_client_args
189
190
191 def get_dc_access_info():
192     try:
193         client_args = get_stx_client_args()
194     except KeyError:
195         logger.error('Please source your RC file before execution, '
196                      'e.g.: `source ~/downloads/admin-rc.sh`')
197         sys.exit(1)
198
199     os_client_args = {}
200     for key, val in client_args.items():
201         os_client_args['os_{key}'.format(key=key)] = val
202     auth_url = urlparse(os_client_args.pop('os_auth_url'))
203     hostname = f"[{auth_url.hostname}]" if is_ipv6(auth_url.hostname) \
204         else auth_url.hostname
205     dcmanager_url = urlparse(get_dc_manager_url())
206     dcmanager_url = dcmanager_url._replace(netloc=dcmanager_url.netloc.replace(
207         dcmanager_url.hostname, hostname))
208
209     os_client_args['dcmanager_url'] = dcmanager_url.geturl()
210     os_client_args['auth_url'] = auth_url.geturl()
211     os_client_args['username'] = os_client_args.pop('os_username')
212     os_client_args['api_key'] = os_client_args.pop('os_api_key')
213     os_client_args['project_name'] = os_client_args.pop('os_project_name')
214     os_client_args['user_domain_name'] = 'Default'
215     os_client_args['project_domain_name'] = 'Default'
216
217     return os_client_args
218
219
220 def get_fm_access_info(subcloud_hostname: str = "",
221                        sub_is_https: bool = False):
222     try:
223         client_args = get_stx_client_args()
224     except KeyError:
225         logger.error('Please source your RC file before execution, '
226                      'e.g.: `source ~/downloads/admin-rc.sh`')
227         sys.exit(1)
228
229     os_client_args = {}
230     for key, val in client_args.items():
231         os_client_args['os_{key}'.format(key=key)] = val
232
233     auth_url = urlparse(os_client_args.pop('os_auth_url'))
234     os_client_args['auth_url'] = auth_url.geturl()
235
236     if "" != subcloud_hostname:
237         subcloud_hostname = f"[{subcloud_hostname}]" if \
238             is_ipv6(subcloud_hostname) else subcloud_hostname
239         orig_auth_url = urlparse(get_stx_url())
240         new_auth_url = orig_auth_url._replace(
241             netloc=orig_auth_url.netloc.replace(
242                 orig_auth_url.hostname, subcloud_hostname))
243         if sub_is_https:
244             new_auth_url = new_auth_url._replace(
245                 scheme=new_auth_url.scheme.
246                 replace(new_auth_url.scheme, 'https'))
247         os_client_args['auth_url'] = new_auth_url.geturl()
248         os_client_args['endpoint_type'] = 'publicURL'
249
250     os_client_args['insecure'] = CGTS_INSECURE_SSL
251
252     os_client_args['username'] = os_client_args.pop('os_username')
253     os_client_args['password'] = os_client_args.pop('os_api_key')
254     os_client_args['project_name'] = os_client_args.pop('os_project_name')
255     os_client_args['user_domain_name'] = 'Default'
256     os_client_args['project_domain_name'] = 'Default'
257
258     return os_client_args
259
260
261 def get_k8s_api_endpoint():
262     K8S_KUBECONFIG = os.environ.get("K8S_KUBECONFIG", None)
263     K8S_APISERVER = os.environ.get("K8S_APISERVER", None)
264     K8S_TOKEN = os.environ.get("K8S_TOKEN", None)
265     return K8S_KUBECONFIG, K8S_APISERVER, K8S_TOKEN
266
267
268 def get_helm_cli():
269     return '/usr/local/bin/helm'
270
271
272 def get_containers_shared_folder():
273     return '/share'
274
275
276 def get_system_controller_as_respool():
277     return True
278
279
280 def gen_k8s_config_dict(cluster_api_endpoint, cluster_ca_cert, admin_user,
281                         admin_client_cert, admin_client_key):
282     # KUBECONFIG environment variable
283     # reference:
284     # https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/
285     data = {
286         'apiVersion': 'v1',
287         'clusters': [
288             {
289                 'cluster': {
290                     'server':
291                     cluster_api_endpoint,
292                     'certificate-authority-data':
293                     cluster_ca_cert,
294                 },
295                 'name': 'inf-cluster'
296             }],
297         'contexts': [
298             {
299                 'context': {
300                     'cluster': 'inf-cluster',
301                     'user': 'kubernetes-admin'
302                 },
303                 'name': 'kubernetes-admin@inf-cluster'
304             }
305         ],
306         'current-context': 'kubernetes-admin@inf-cluster',
307         'kind': 'Config',
308         'preferences': {},
309         'users': [
310             {
311                 'name': admin_user,
312                 'user': {
313                     'client-certificate-data':
314                     admin_client_cert,
315                     'client-key-data':
316                     admin_client_key,
317                 }
318             }]
319     }
320
321     return data
322
323
324 def get_helmcli_access():
325     host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
326     if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
327             config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
328         host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
329     host = "127.0.0.1" if host_external is None or host_external == '' \
330         else host_external
331     port = "10022" if host_external is None or host_external == '' \
332         else "30022"
333
334     helm_host_with_port = host+':'+port
335     helm_user = 'helm'
336     helm_pass = os.environ.get("HELM_USER_PASSWD")
337
338     return helm_host_with_port, helm_user, helm_pass
339
340
341 def get_alarm_yaml_filename():
342     alarm_yaml_name = os.environ.get("ALARM_YAML")
343     if alarm_yaml_name is not None and os.path.isfile(alarm_yaml_name):
344         return alarm_yaml_name
345     return "/configs/alarm.yaml"
346
347
348 def get_events_yaml_filename():
349     events_yaml_name = os.environ.get("EVENTS_YAML")
350     if events_yaml_name is not None and os.path.isfile(events_yaml_name):
351         return events_yaml_name
352     return "/configs/events.yaml"
353
354
355 # get k8s host from env:
356 def get_k8s_host():
357     k8s_host = os.environ.get("KUBERNETES_SERVICE_HOST")
358     if k8s_host is None:
359         raise Exception('Get k8s host failed.')
360     return k8s_host
361
362
363 # get k8s host port from env:
364 def get_k8s_port():
365     k8s_port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", '443')
366     return k8s_port
367
368
369 # token review url
370 def get_review_url():
371     try:
372         api = '/apis/authentication.k8s.io/v1/tokenreviews'
373         return "{0}{1}:{2}{3}".format(
374             'https://', get_k8s_host(), get_k8s_port(), api)
375     except Exception:
376         raise Exception('Get k8s review url failed')
377
378
379 # get reviewer token
380 def get_reviewer_token():
381     # token path default is below.
382     token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
383     with open(token_path, 'r') as f:
384         ctt = f.read()
385     return ctt
386
387
388 def get_auth_provider():
389     return config.conf.auth_provider
390
391
392 def get_dms_support_profiles():
393     profiles_list = []
394     profiles_str = config.conf.API.DMS_SUPPORT_PROFILES
395     if profiles_str:
396         profiles_strip = profiles_str.strip(' []')
397         profiles_str = profiles_strip.replace("'", "").replace(
398             '"', "")
399         profiles_list = profiles_str.split(',')
400     if 'native_k8sapi' not in profiles_list:
401         profiles_list.append('native_k8sapi')
402     return profiles_list