Fix install O2 on subcloud failed
[pti/o2.git] / o2common / config / config.py
1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import os
16 import sys
17 import ipaddress
18 from urllib.parse import urlparse
19
20 from o2common import config
21 from o2common.helper import o2logging
22 logger = o2logging.get_logger(__name__)
23
24
25 CGTS_INSECURE_SSL = os.environ.get("CGTS_INSECURE_SSL", "0") == "1"
26
27 _DEFAULT_STX_URL = "http://192.168.204.1:5000/v3"
28 _DCMANAGER_URL_PORT = os.environ.get("DCMANAGER_API_PORT", "8119")
29 _DCMANAGER_URL_PATH = os.environ.get("DCMANAGER_API_PATH", "/v1.0")
30
31
32 def get_config_path():
33     path = os.environ.get("O2APP_CONFIG", "/configs/o2app.conf")
34     return path
35
36
37 def get_smo_ca_config_path():
38     path = os.environ.get("SMO_CA_CONFIG", "/configs/smoca.crt")
39     return path
40
41
42 def get_postgres_uri():
43     host = os.environ.get("DB_HOST", "localhost")
44     port = int(os.environ.get("DB_PORT", 5432))
45     password = os.environ.get("DB_PASSWORD", "o2ims123")
46     user, db_name = "o2ims", "o2ims"
47     return f"postgresql://{user}:{password}@{host}:{port}/{db_name}"
48
49
50 def get_api_url():
51     host_interal = os.environ.get("API_HOST", "localhost")
52     host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
53     if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
54             config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
55         host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
56     host = host_interal if host_external is None or host_external == '' \
57         else host_external
58
59     port_internal = 5005 if host == "localhost" else 80
60     port_external = 30205
61     port = port_internal if host_external is None or host_external == '' \
62         else port_external
63     return f"https://{host}:{port}"
64
65
66 def get_region_name():
67     region_name = os.environ.get("OS_REGION_NAME", "RegionOne")
68     return region_name
69
70
71 def get_stx_url():
72     try:
73         return get_stx_client_args()["auth_url"]
74     except KeyError:
75         logger.error('Please source your RC file before execution, '
76                      'e.g.: `source ~/downloads/admin-rc.sh`')
77         sys.exit(1)
78
79
80 def get_dc_manager_url():
81     auth_url = os.environ.get("DCMANAGER_OS_AUTH_URL", None)
82     if auth_url is None:
83         temp_url = get_stx_url()
84         u = urlparse(temp_url)
85         u = u._replace(netloc=f"{u.hostname}:{_DCMANAGER_URL_PORT}")
86         u = u._replace(path=_DCMANAGER_URL_PATH)
87         auth_url = u.geturl()
88     return auth_url
89
90
91 def get_root_api_base():
92     return "/"
93
94
95 def get_o2ims_api_base():
96     return get_root_api_base() + 'o2ims-infrastructureInventory'
97
98
99 def get_o2ims_monitoring_api_v1():
100     return '/v1'
101
102
103 def get_o2ims_inventory_api_v1():
104     return '/v1'
105
106
107 def get_o2ims_monitoring_api_base():
108     return get_root_api_base() + 'o2ims-infrastructureMonitoring'
109
110
111 def get_o2dms_api_base():
112     return get_root_api_base() + "o2dms/v1"
113
114
115 def get_redis_host_and_port():
116     host = os.environ.get("REDIS_HOST", "localhost")
117     port = int(os.environ.get("REDIS_PORT", 6379))
118     return dict(host=host, port=port)
119
120
121 def get_smo_o2endpoint():
122     smo_o2endpoint = os.environ.get(
123         "SMO_O2_ENDPOINT", "http://localhost/smo_sim")
124     return smo_o2endpoint
125
126
127 def get_stx_client_args():
128     client_args = dict(
129         auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
130         username=os.environ.get('OS_USERNAME', "admin"),
131         api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
132         project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
133     )
134     if config.conf.OCLOUD.OS_AUTH_URL is not None and \
135             config.conf.OCLOUD.OS_AUTH_URL != '':
136         client_args['auth_url'] = config.conf.OCLOUD.OS_AUTH_URL
137     if config.conf.OCLOUD.OS_USERNAME is not None and \
138             config.conf.OCLOUD.OS_USERNAME != '':
139         client_args['username'] = config.conf.OCLOUD.OS_USERNAME
140     if config.conf.OCLOUD.OS_PASSWORD is not None and \
141             config.conf.OCLOUD.OS_PASSWORD != '':
142         client_args['api_key'] = config.conf.OCLOUD.OS_PASSWORD
143     if config.conf.OCLOUD.OS_PROJECT_NAME is not None and \
144             config.conf.OCLOUD.OS_PROJECT_NAME != '':
145         client_args['project_name'] = config.conf.OCLOUD.OS_PROJECT_NAME
146     return client_args
147
148
149 def is_ipv6(address):
150     try:
151         # Try to convert the address and check the IP version
152         ip = ipaddress.ip_address(address)
153         return ip.version == 6
154     except ValueError:
155         return False
156
157
158 def get_stx_access_info(region_name=get_region_name(),
159                         subcloud_hostname: str = "",
160                         sub_is_https: bool = False):
161     try:
162         client_args = get_stx_client_args()
163     except KeyError:
164         logger.error('Please source your RC file before execution, '
165                      'e.g.: `source ~/downloads/admin-rc.sh`')
166         sys.exit(1)
167
168     os_client_args = {}
169     for key, val in client_args.items():
170         os_client_args['os_{key}'.format(key=key)] = val
171     if "" != subcloud_hostname:
172         if is_ipv6(subcloud_hostname):
173             subcloud_hostname = "[" + subcloud_hostname + "]"
174         orig_auth_url = urlparse(get_stx_url())
175         new_auth_url = orig_auth_url._replace(
176             netloc=orig_auth_url.netloc.replace(
177                 orig_auth_url.hostname, subcloud_hostname))
178         # new_auth_url = new_auth_url._replace(
179         #     netloc=new_auth_url.netloc.replace(str(new_auth_url.port),
180         # "18002"))
181         if sub_is_https:
182             new_auth_url = new_auth_url._replace(
183                 scheme=new_auth_url.scheme.
184                 replace(new_auth_url.scheme, 'https'))
185             os_client_args['insecure'] = CGTS_INSECURE_SSL
186         os_client_args['os_auth_url'] = new_auth_url.geturl()
187         os_client_args['os_endpoint_type'] = 'public'
188     # os_client_args['system_url'] = os_client_args['os_auth_url']
189     os_client_args['os_password'] = os_client_args.pop('os_api_key')
190     os_client_args['os_region_name'] = region_name
191     os_client_args['api_version'] = 1
192     # os_client_args['user_domain_name'] = 'Default'
193     # os_client_args['project_domain_name'] = 'Default'
194     return os_client_args
195
196
197 def get_dc_access_info():
198     try:
199         client_args = get_stx_client_args()
200     except KeyError:
201         logger.error('Please source your RC file before execution, '
202                      'e.g.: `source ~/downloads/admin-rc.sh`')
203         sys.exit(1)
204
205     os_client_args = {}
206     for key, val in client_args.items():
207         os_client_args['os_{key}'.format(key=key)] = val
208     auth_url = urlparse(os_client_args.pop('os_auth_url'))
209     hostname = f"[{auth_url.hostname}]" if is_ipv6(auth_url.hostname) \
210         else auth_url.hostname
211     dcmanager_url = urlparse(get_dc_manager_url())
212     dcmanager_url = dcmanager_url._replace(netloc=dcmanager_url.netloc.replace(
213         dcmanager_url.hostname, hostname))
214
215     os_client_args['dcmanager_url'] = dcmanager_url.geturl()
216     os_client_args['auth_url'] = auth_url.geturl()
217     os_client_args['username'] = os_client_args.pop('os_username')
218     os_client_args['api_key'] = os_client_args.pop('os_api_key')
219     os_client_args['project_name'] = os_client_args.pop('os_project_name')
220     os_client_args['user_domain_name'] = 'Default'
221     os_client_args['project_domain_name'] = 'Default'
222
223     return os_client_args
224
225
226 def get_fm_access_info(subcloud_hostname: str = "",
227                        sub_is_https: bool = False):
228     try:
229         client_args = get_stx_client_args()
230     except KeyError:
231         logger.error('Please source your RC file before execution, '
232                      'e.g.: `source ~/downloads/admin-rc.sh`')
233         sys.exit(1)
234
235     os_client_args = {}
236     for key, val in client_args.items():
237         os_client_args['os_{key}'.format(key=key)] = val
238
239     auth_url = urlparse(os_client_args.pop('os_auth_url'))
240     os_client_args['auth_url'] = auth_url.geturl()
241
242     if "" != subcloud_hostname:
243         subcloud_hostname = f"[{subcloud_hostname}]" if \
244             is_ipv6(subcloud_hostname) else subcloud_hostname
245         orig_auth_url = urlparse(get_stx_url())
246         new_auth_url = orig_auth_url._replace(
247             netloc=orig_auth_url.netloc.replace(
248                 orig_auth_url.hostname, subcloud_hostname))
249         if sub_is_https:
250             new_auth_url = new_auth_url._replace(
251                 scheme=new_auth_url.scheme.
252                 replace(new_auth_url.scheme, 'https'))
253         os_client_args['auth_url'] = new_auth_url.geturl()
254         os_client_args['endpoint_type'] = 'publicURL'
255
256     os_client_args['insecure'] = CGTS_INSECURE_SSL
257
258     os_client_args['username'] = os_client_args.pop('os_username')
259     os_client_args['password'] = os_client_args.pop('os_api_key')
260     os_client_args['project_name'] = os_client_args.pop('os_project_name')
261     os_client_args['user_domain_name'] = 'Default'
262     os_client_args['project_domain_name'] = 'Default'
263
264     return os_client_args
265
266
267 def get_k8s_api_endpoint():
268     K8S_KUBECONFIG = os.environ.get("K8S_KUBECONFIG", None)
269     K8S_APISERVER = os.environ.get("K8S_APISERVER", None)
270     K8S_TOKEN = os.environ.get("K8S_TOKEN", None)
271     return K8S_KUBECONFIG, K8S_APISERVER, K8S_TOKEN
272
273
274 def get_helm_cli():
275     return '/usr/local/bin/helm'
276
277
278 def get_containers_shared_folder():
279     return '/share'
280
281
282 def get_system_controller_as_respool():
283     return True
284
285
286 def gen_k8s_config_dict(cluster_api_endpoint, cluster_ca_cert, admin_user,
287                         admin_client_cert, admin_client_key):
288     # KUBECONFIG environment variable
289     # reference:
290     # https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/
291     data = {
292         'apiVersion': 'v1',
293         'clusters': [
294             {
295                 'cluster': {
296                     'server':
297                     cluster_api_endpoint,
298                     'certificate-authority-data':
299                     cluster_ca_cert,
300                 },
301                 'name': 'inf-cluster'
302             }],
303         'contexts': [
304             {
305                 'context': {
306                     'cluster': 'inf-cluster',
307                     'user': 'kubernetes-admin'
308                 },
309                 'name': 'kubernetes-admin@inf-cluster'
310             }
311         ],
312         'current-context': 'kubernetes-admin@inf-cluster',
313         'kind': 'Config',
314         'preferences': {},
315         'users': [
316             {
317                 'name': admin_user,
318                 'user': {
319                     'client-certificate-data':
320                     admin_client_cert,
321                     'client-key-data':
322                     admin_client_key,
323                 }
324             }]
325     }
326
327     return data
328
329
330 def get_helmcli_access():
331     host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
332     if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
333             config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
334         host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
335     host = "127.0.0.1" if host_external is None or host_external == '' \
336         else host_external
337     port = "10022" if host_external is None or host_external == '' \
338         else "30022"
339
340     helm_host_with_port = host+':'+port
341     helm_user = 'helm'
342     helm_pass = os.environ.get("HELM_USER_PASSWD")
343
344     return helm_host_with_port, helm_user, helm_pass
345
346
347 def get_alarm_yaml_filename():
348     alarm_yaml_name = os.environ.get("ALARM_YAML")
349     if alarm_yaml_name is not None and os.path.isfile(alarm_yaml_name):
350         return alarm_yaml_name
351     return "/configs/alarm.yaml"
352
353
354 def get_events_yaml_filename():
355     events_yaml_name = os.environ.get("EVENTS_YAML")
356     if events_yaml_name is not None and os.path.isfile(events_yaml_name):
357         return events_yaml_name
358     return "/configs/events.yaml"
359
360
361 # get k8s host from env:
362 def get_k8s_host():
363     k8s_host = os.environ.get("KUBERNETES_SERVICE_HOST")
364     if k8s_host is None:
365         raise Exception('Get k8s host failed.')
366     return k8s_host
367
368
369 # get k8s host port from env:
370 def get_k8s_port():
371     k8s_port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", '443')
372     return k8s_port
373
374
375 # token review url
376 def get_review_url():
377     try:
378         api = '/apis/authentication.k8s.io/v1/tokenreviews'
379         return "{0}{1}:{2}{3}".format(
380             'https://', get_k8s_host(), get_k8s_port(), api)
381     except Exception:
382         raise Exception('Get k8s review url failed')
383
384
385 # get reviewer token
386 def get_reviewer_token():
387     # token path default is below.
388     token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
389     with open(token_path, 'r') as f:
390         ctt = f.read()
391     return ctt
392
393
394 def get_auth_provider():
395     return config.conf.auth_provider
396
397
398 def get_dms_support_profiles():
399     profiles_list = []
400     profiles_str = config.conf.API.DMS_SUPPORT_PROFILES
401     if profiles_str:
402         profiles_strip = profiles_str.strip(' []')
403         profiles_str = profiles_strip.replace("'", "").replace(
404             '"', "")
405         profiles_list = profiles_str.split(',')
406     if 'native_k8sapi' not in profiles_list:
407         profiles_list.append('native_k8sapi')
408     return profiles_list