Fix the IPv6 does not work
[pti/o2.git] / o2common / config / config.py
1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import os
16 import sys
17 import ipaddress
18 from urllib.parse import urlparse
19
20 from o2common import config
21 from o2common.helper import o2logging
22 logger = o2logging.get_logger(__name__)
23
24
25 _DEFAULT_DCMANAGER_URL = "http://192.168.204.1:8119/v1.0"
26 _DEFAULT_STX_URL = "http://192.168.204.1:5000/v3"
27
28
29 def get_config_path():
30     path = os.environ.get("O2APP_CONFIG", "/configs/o2app.conf")
31     return path
32
33
34 def get_smo_ca_config_path():
35     path = os.environ.get("SMO_CA_CONFIG", "/configs/smoca.crt")
36     return path
37
38
39 def get_postgres_uri():
40     host = os.environ.get("DB_HOST", "localhost")
41     port = int(os.environ.get("DB_PORT", 5432))
42     password = os.environ.get("DB_PASSWORD", "o2ims123")
43     user, db_name = "o2ims", "o2ims"
44     return f"postgresql://{user}:{password}@{host}:{port}/{db_name}"
45
46
47 def get_api_url():
48     host_interal = os.environ.get("API_HOST", "localhost")
49     host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
50     if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
51             config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
52         host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
53     host = host_interal if host_external is None or host_external == '' \
54         else host_external
55
56     port_internal = 5005 if host == "localhost" else 80
57     port_external = 30205
58     port = port_internal if host_external is None or host_external == '' \
59         else port_external
60     return f"https://{host}:{port}"
61
62
63 def get_root_api_base():
64     return "/"
65
66
67 def get_o2ims_api_base():
68     return get_root_api_base() + 'o2ims-infrastructureInventory'
69
70
71 def get_o2ims_monitoring_api_v1():
72     return '/v1'
73
74
75 def get_o2ims_inventory_api_v1():
76     return '/v1'
77
78
79 def get_o2ims_monitoring_api_base():
80     return get_root_api_base() + 'o2ims-infrastructureMonitoring'
81
82
83 def get_o2dms_api_base():
84     return get_root_api_base() + "o2dms/v1"
85
86
87 def get_redis_host_and_port():
88     host = os.environ.get("REDIS_HOST", "localhost")
89     port = int(os.environ.get("REDIS_PORT", 6379))
90     return dict(host=host, port=port)
91
92
93 def get_smo_o2endpoint():
94     smo_o2endpoint = os.environ.get(
95         "SMO_O2_ENDPOINT", "http://localhost/smo_sim")
96     return smo_o2endpoint
97
98
99 def get_stx_client_args():
100     client_args = dict(
101         auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
102         username=os.environ.get('OS_USERNAME', "admin"),
103         api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
104         project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
105     )
106     if config.conf.OCLOUD.OS_AUTH_URL is not None and \
107             config.conf.OCLOUD.OS_AUTH_URL != '':
108         client_args['auth_url'] = config.conf.OCLOUD.OS_AUTH_URL
109     if config.conf.OCLOUD.OS_USERNAME is not None and \
110             config.conf.OCLOUD.OS_USERNAME != '':
111         client_args['username'] = config.conf.OCLOUD.OS_USERNAME
112     if config.conf.OCLOUD.OS_PASSWORD is not None and \
113             config.conf.OCLOUD.OS_PASSWORD != '':
114         client_args['api_key'] = config.conf.OCLOUD.OS_PASSWORD
115     if config.conf.OCLOUD.OS_PROJECT_NAME is not None and \
116             config.conf.OCLOUD.OS_PROJECT_NAME != '':
117         client_args['project_name'] = config.conf.OCLOUD.OS_PROJECT_NAME
118     return client_args
119
120
121 def is_ipv6(address):
122     try:
123         # Try to convert the address and check the IP version
124         ip = ipaddress.ip_address(address)
125         return ip.version == 6
126     except ValueError:
127         return False
128
129
130 def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "",
131                         sub_is_https: bool = False):
132     # authurl = os.environ.get("STX_AUTH_URL", "http://192.168.204.1:5000/v3")
133     # username = os.environ.get("STX_USERNAME", "admin")
134     # pswd = os.environ.get("STX_PASSWORD", "passwd1")
135     # stx_access_info = (authurl, username, pswd)
136     try:
137         # client_args = dict(
138         #     auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
139         #     username=os.environ.get('OS_USERNAME', "admin"),
140         #     api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
141         #     project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
142         # )
143         client_args = get_stx_client_args()
144     except KeyError:
145         logger.error('Please source your RC file before execution, '
146                      'e.g.: `source ~/downloads/admin-rc.sh`')
147         sys.exit(1)
148
149     os_client_args = {}
150     for key, val in client_args.items():
151         os_client_args['os_{key}'.format(key=key)] = val
152     if "" != subcloud_hostname:
153         if is_ipv6(subcloud_hostname):
154             subcloud_hostname = "[" + subcloud_hostname + "]"
155         orig_auth_url = urlparse(_DEFAULT_STX_URL)
156         new_auth_url = orig_auth_url._replace(
157             netloc=orig_auth_url.netloc.replace(
158                 orig_auth_url.hostname, subcloud_hostname))
159         # new_auth_url = new_auth_url._replace(
160         #     netloc=new_auth_url.netloc.replace(str(new_auth_url.port),
161         # "18002"))
162         if sub_is_https:
163             new_auth_url = new_auth_url._replace(
164                 scheme=new_auth_url.scheme.
165                 replace(new_auth_url.scheme, 'https'))
166             os_client_args['insecure'] = True
167         os_client_args['os_auth_url'] = new_auth_url.geturl()
168         os_client_args['os_endpoint_type'] = 'public'
169     # os_client_args['system_url'] = os_client_args['os_auth_url']
170     os_client_args['os_password'] = os_client_args.pop('os_api_key')
171     os_client_args['os_region_name'] = region_name
172     os_client_args['api_version'] = 1
173     # os_client_args['user_domain_name'] = 'Default'
174     # os_client_args['project_domain_name'] = 'Default'
175     return os_client_args
176
177
178 def get_dc_access_info():
179     try:
180         # client_args = dict(
181         #     auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
182         #     username=os.environ.get('OS_USERNAME', "admin"),
183         #     api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
184         #     project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
185         # )
186         client_args = get_stx_client_args()
187     except KeyError:
188         logger.error('Please source your RC file before execution, '
189                      'e.g.: `source ~/downloads/admin-rc.sh`')
190         sys.exit(1)
191
192     os_client_args = {}
193     for key, val in client_args.items():
194         os_client_args['os_{key}'.format(key=key)] = val
195     auth_url = urlparse(os_client_args.pop('os_auth_url'))
196     dcmanager_url = urlparse(_DEFAULT_DCMANAGER_URL)
197     dcmanager_url = dcmanager_url._replace(netloc=dcmanager_url.netloc.replace(
198         dcmanager_url.hostname, auth_url.hostname))
199
200     os_client_args['dcmanager_url'] = dcmanager_url.geturl()
201     os_client_args['auth_url'] = auth_url.geturl()
202     os_client_args['username'] = os_client_args.pop('os_username')
203     os_client_args['api_key'] = os_client_args.pop('os_api_key')
204     os_client_args['project_name'] = os_client_args.pop('os_project_name')
205     os_client_args['user_domain_name'] = 'Default'
206     os_client_args['project_domain_name'] = 'Default'
207
208     return os_client_args
209
210
211 def get_fm_access_info(subcloud_hostname: str = "",
212                        sub_is_https: bool = False):
213     try:
214         # client_args = dict(
215         #     auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
216         #     username=os.environ.get('OS_USERNAME', "admin"),
217         #     api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
218         #     project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
219         # )
220         client_args = get_stx_client_args()
221     except KeyError:
222         logger.error('Please source your RC file before execution, '
223                      'e.g.: `source ~/downloads/admin-rc.sh`')
224         sys.exit(1)
225
226     os_client_args = {}
227     for key, val in client_args.items():
228         os_client_args['os_{key}'.format(key=key)] = val
229
230     auth_url = urlparse(os_client_args.pop('os_auth_url'))
231     os_client_args['auth_url'] = auth_url.geturl()
232
233     if "" != subcloud_hostname:
234         orig_auth_url = urlparse(_DEFAULT_STX_URL)
235         new_auth_url = orig_auth_url._replace(
236             netloc=orig_auth_url.netloc.replace(
237                 orig_auth_url.hostname, subcloud_hostname))
238         if sub_is_https:
239             new_auth_url = new_auth_url._replace(
240                 scheme=new_auth_url.scheme.
241                 replace(new_auth_url.scheme, 'https'))
242         os_client_args['auth_url'] = new_auth_url.geturl()
243         os_client_args['endpoint_type'] = 'publicURL'
244
245     os_client_args['insecure'] = True
246
247     os_client_args['username'] = os_client_args.pop('os_username')
248     os_client_args['password'] = os_client_args.pop('os_api_key')
249     os_client_args['project_name'] = os_client_args.pop('os_project_name')
250     os_client_args['user_domain_name'] = 'Default'
251     os_client_args['project_domain_name'] = 'Default'
252
253     return os_client_args
254
255
256 def get_k8s_api_endpoint():
257     K8S_KUBECONFIG = os.environ.get("K8S_KUBECONFIG", None)
258     K8S_APISERVER = os.environ.get("K8S_APISERVER", None)
259     K8S_TOKEN = os.environ.get("K8S_TOKEN", None)
260     return K8S_KUBECONFIG, K8S_APISERVER, K8S_TOKEN
261
262
263 def get_helm_cli():
264     return '/usr/local/bin/helm'
265
266
267 def get_containers_shared_folder():
268     return '/share'
269
270
271 def get_system_controller_as_respool():
272     return True
273
274
275 def gen_k8s_config_dict(cluster_api_endpoint, cluster_ca_cert, admin_user,
276                         admin_client_cert, admin_client_key):
277     # KUBECONFIG environment variable
278     # reference:
279     # https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/
280     data = {
281         'apiVersion': 'v1',
282         'clusters': [
283             {
284                 'cluster': {
285                     'server':
286                     cluster_api_endpoint,
287                     'certificate-authority-data':
288                     cluster_ca_cert,
289                 },
290                 'name': 'inf-cluster'
291             }],
292         'contexts': [
293             {
294                 'context': {
295                     'cluster': 'inf-cluster',
296                     'user': 'kubernetes-admin'
297                 },
298                 'name': 'kubernetes-admin@inf-cluster'
299             }
300         ],
301         'current-context': 'kubernetes-admin@inf-cluster',
302         'kind': 'Config',
303         'preferences': {},
304         'users': [
305             {
306                 'name': admin_user,
307                 'user': {
308                     'client-certificate-data':
309                     admin_client_cert,
310                     'client-key-data':
311                     admin_client_key,
312                 }
313             }]
314     }
315
316     return data
317
318
319 def get_helmcli_access():
320     host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
321     if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
322             config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
323         host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
324     host = "127.0.0.1" if host_external is None or host_external == '' \
325         else host_external
326     port = "10022" if host_external is None or host_external == '' \
327         else "30022"
328
329     helm_host_with_port = host+':'+port
330     helm_user = 'helm'
331     helm_pass = os.environ.get("HELM_USER_PASSWD")
332
333     return helm_host_with_port, helm_user, helm_pass
334
335
336 def get_alarm_yaml_filename():
337     alarm_yaml_name = os.environ.get("ALARM_YAML")
338     if alarm_yaml_name is not None and os.path.isfile(alarm_yaml_name):
339         return alarm_yaml_name
340     return "/configs/alarm.yaml"
341
342
343 def get_events_yaml_filename():
344     events_yaml_name = os.environ.get("EVENTS_YAML")
345     if events_yaml_name is not None and os.path.isfile(events_yaml_name):
346         return events_yaml_name
347     return "/configs/events.yaml"
348
349
350 # get k8s host from env:
351 def get_k8s_host():
352     k8s_host = os.environ.get("KUBERNETES_SERVICE_HOST")
353     if k8s_host is None:
354         raise Exception('Get k8s host failed.')
355     return k8s_host
356
357
358 # get k8s host port from env:
359 def get_k8s_port():
360     k8s_port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", '443')
361     return k8s_port
362
363
364 # token review url
365 def get_review_url():
366     try:
367         api = '/apis/authentication.k8s.io/v1/tokenreviews'
368         return "{0}{1}:{2}{3}".format(
369             'https://', get_k8s_host(), get_k8s_port(), api)
370     except Exception:
371         raise Exception('Get k8s review url failed')
372
373
374 # get reviewer token
375 def get_reviewer_token():
376     # token path default is below.
377     token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
378     with open(token_path, 'r') as f:
379         ctt = f.read()
380     return ctt
381
382
383 def get_auth_provider():
384     return 'k8s'
385
386
387 def get_dms_support_profiles():
388     profiles_list = []
389     profiles_str = config.conf.API.DMS_SUPPORT_PROFILES
390     if profiles_str:
391         profiles_strip = profiles_str.strip(' []')
392         profiles_str = profiles_strip.replace("'", "").replace(
393             '"', "")
394         profiles_list = profiles_str.split(',')
395     if 'native_k8sapi' not in profiles_list:
396         profiles_list.append('native_k8sapi')
397     return profiles_list