61c8c692d9432dcfb3b2e4c0da8cb27b8cd1c3c4
[pti/o2.git] / o2common / config / config.py
1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import os
16 import sys
17 import ipaddress
18 from urllib.parse import urlparse
19
20 from o2common import config
21 from o2common.helper import o2logging
22 logger = o2logging.get_logger(__name__)
23
24
25 _DEFAULT_DCMANAGER_URL = "http://192.168.204.1:8119/v1.0"
26 _DEFAULT_STX_URL = "http://192.168.204.1:5000/v3"
27
28
29 def get_config_path():
30     path = os.environ.get("O2APP_CONFIG", "/configs/o2app.conf")
31     return path
32
33
34 def get_smo_ca_config_path():
35     path = os.environ.get("SMO_CA_CONFIG", "/configs/smoca.crt")
36     return path
37
38
39 def get_postgres_uri():
40     host = os.environ.get("DB_HOST", "localhost")
41     port = int(os.environ.get("DB_PORT", 5432))
42     password = os.environ.get("DB_PASSWORD", "o2ims123")
43     user, db_name = "o2ims", "o2ims"
44     return f"postgresql://{user}:{password}@{host}:{port}/{db_name}"
45
46
47 def get_api_url():
48     host_interal = os.environ.get("API_HOST", "localhost")
49     host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
50     if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
51             config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
52         host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
53     host = host_interal if host_external is None or host_external == '' \
54         else host_external
55
56     port_internal = 5005 if host == "localhost" else 80
57     port_external = 30205
58     port = port_internal if host_external is None or host_external == '' \
59         else port_external
60     return f"https://{host}:{port}"
61
62
63 def get_root_api_base():
64     return "/"
65
66
67 def get_o2ims_api_base():
68     return get_root_api_base() + 'o2ims-infrastructureInventory'
69
70
71 def get_o2ims_monitoring_api_v1():
72     return '/v1'
73
74
75 def get_o2ims_inventory_api_v1():
76     return '/v1'
77
78
79 def get_o2ims_monitoring_api_base():
80     return get_root_api_base() + 'o2ims-infrastructureMonitoring'
81
82
83 def get_o2dms_api_base():
84     return get_root_api_base() + "o2dms/v1"
85
86
87 def get_redis_host_and_port():
88     host = os.environ.get("REDIS_HOST", "localhost")
89     port = int(os.environ.get("REDIS_PORT", 6379))
90     return dict(host=host, port=port)
91
92
93 def get_smo_o2endpoint():
94     smo_o2endpoint = os.environ.get(
95         "SMO_O2_ENDPOINT", "http://localhost/smo_sim")
96     return smo_o2endpoint
97
98
99 def get_stx_client_args():
100     client_args = dict(
101         auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
102         username=os.environ.get('OS_USERNAME', "admin"),
103         api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
104         project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
105     )
106     if config.conf.OCLOUD.OS_AUTH_URL is not None and \
107             config.conf.OCLOUD.OS_AUTH_URL != '':
108         client_args['auth_url'] = config.conf.OCLOUD.OS_AUTH_URL
109     if config.conf.OCLOUD.OS_USERNAME is not None and \
110             config.conf.OCLOUD.OS_USERNAME != '':
111         client_args['username'] = config.conf.OCLOUD.OS_USERNAME
112     if config.conf.OCLOUD.OS_PASSWORD is not None and \
113             config.conf.OCLOUD.OS_PASSWORD != '':
114         client_args['api_key'] = config.conf.OCLOUD.OS_PASSWORD
115     if config.conf.OCLOUD.OS_PROJECT_NAME is not None and \
116             config.conf.OCLOUD.OS_PROJECT_NAME != '':
117         client_args['project_name'] = config.conf.OCLOUD.OS_PROJECT_NAME
118     return client_args
119
120
121 def is_ipv6(address):
122     try:
123         # Try to convert the address and check the IP version
124         ip = ipaddress.ip_address(address)
125         return ip.version == 6
126     except ValueError:
127         return False
128
129
130 def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "",
131                         sub_is_https: bool = False):
132     # authurl = os.environ.get("STX_AUTH_URL", "http://192.168.204.1:5000/v3")
133     # username = os.environ.get("STX_USERNAME", "admin")
134     # pswd = os.environ.get("STX_PASSWORD", "passwd1")
135     # stx_access_info = (authurl, username, pswd)
136     try:
137         # client_args = dict(
138         #     auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
139         #     username=os.environ.get('OS_USERNAME', "admin"),
140         #     api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
141         #     project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
142         # )
143         client_args = get_stx_client_args()
144     except KeyError:
145         logger.error('Please source your RC file before execution, '
146                      'e.g.: `source ~/downloads/admin-rc.sh`')
147         sys.exit(1)
148
149     os_client_args = {}
150     for key, val in client_args.items():
151         os_client_args['os_{key}'.format(key=key)] = val
152     if "" != subcloud_hostname:
153         if is_ipv6(subcloud_hostname):
154             subcloud_hostname = "[" + subcloud_hostname + "]"
155         orig_auth_url = urlparse(_DEFAULT_STX_URL)
156         new_auth_url = orig_auth_url._replace(
157             netloc=orig_auth_url.netloc.replace(
158                 orig_auth_url.hostname, subcloud_hostname))
159         # new_auth_url = new_auth_url._replace(
160         #     netloc=new_auth_url.netloc.replace(str(new_auth_url.port),
161         # "18002"))
162         if sub_is_https:
163             new_auth_url = new_auth_url._replace(
164                 scheme=new_auth_url.scheme.
165                 replace(new_auth_url.scheme, 'https'))
166             os_client_args['insecure'] = True
167         os_client_args['os_auth_url'] = new_auth_url.geturl()
168         os_client_args['os_endpoint_type'] = 'public'
169     # os_client_args['system_url'] = os_client_args['os_auth_url']
170     os_client_args['os_password'] = os_client_args.pop('os_api_key')
171     os_client_args['os_region_name'] = region_name
172     os_client_args['api_version'] = 1
173     # os_client_args['user_domain_name'] = 'Default'
174     # os_client_args['project_domain_name'] = 'Default'
175     return os_client_args
176
177
178 def get_dc_access_info():
179     try:
180         # client_args = dict(
181         #     auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
182         #     username=os.environ.get('OS_USERNAME', "admin"),
183         #     api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
184         #     project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
185         # )
186         client_args = get_stx_client_args()
187     except KeyError:
188         logger.error('Please source your RC file before execution, '
189                      'e.g.: `source ~/downloads/admin-rc.sh`')
190         sys.exit(1)
191
192     os_client_args = {}
193     for key, val in client_args.items():
194         os_client_args['os_{key}'.format(key=key)] = val
195     auth_url = urlparse(os_client_args.pop('os_auth_url'))
196     hostname = f"[{auth_url.hostname}]" if is_ipv6(auth_url.hostname) \
197         else auth_url.hostname
198     dcmanager_url = urlparse(_DEFAULT_DCMANAGER_URL)
199     dcmanager_url = dcmanager_url._replace(netloc=dcmanager_url.netloc.replace(
200         dcmanager_url.hostname, hostname))
201
202     os_client_args['dcmanager_url'] = dcmanager_url.geturl()
203     os_client_args['auth_url'] = auth_url.geturl()
204     os_client_args['username'] = os_client_args.pop('os_username')
205     os_client_args['api_key'] = os_client_args.pop('os_api_key')
206     os_client_args['project_name'] = os_client_args.pop('os_project_name')
207     os_client_args['user_domain_name'] = 'Default'
208     os_client_args['project_domain_name'] = 'Default'
209
210     return os_client_args
211
212
213 def get_fm_access_info(subcloud_hostname: str = "",
214                        sub_is_https: bool = False):
215     try:
216         # client_args = dict(
217         #     auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
218         #     username=os.environ.get('OS_USERNAME', "admin"),
219         #     api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
220         #     project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
221         # )
222         client_args = get_stx_client_args()
223     except KeyError:
224         logger.error('Please source your RC file before execution, '
225                      'e.g.: `source ~/downloads/admin-rc.sh`')
226         sys.exit(1)
227
228     os_client_args = {}
229     for key, val in client_args.items():
230         os_client_args['os_{key}'.format(key=key)] = val
231
232     auth_url = urlparse(os_client_args.pop('os_auth_url'))
233     os_client_args['auth_url'] = auth_url.geturl()
234
235     if "" != subcloud_hostname:
236         subcloud_hostname = f"[{subcloud_hostname}]" if \
237             is_ipv6(subcloud_hostname) else subcloud_hostname
238         orig_auth_url = urlparse(_DEFAULT_STX_URL)
239         new_auth_url = orig_auth_url._replace(
240             netloc=orig_auth_url.netloc.replace(
241                 orig_auth_url.hostname, subcloud_hostname))
242         if sub_is_https:
243             new_auth_url = new_auth_url._replace(
244                 scheme=new_auth_url.scheme.
245                 replace(new_auth_url.scheme, 'https'))
246         os_client_args['auth_url'] = new_auth_url.geturl()
247         os_client_args['endpoint_type'] = 'publicURL'
248
249     os_client_args['insecure'] = True
250
251     os_client_args['username'] = os_client_args.pop('os_username')
252     os_client_args['password'] = os_client_args.pop('os_api_key')
253     os_client_args['project_name'] = os_client_args.pop('os_project_name')
254     os_client_args['user_domain_name'] = 'Default'
255     os_client_args['project_domain_name'] = 'Default'
256
257     return os_client_args
258
259
260 def get_k8s_api_endpoint():
261     K8S_KUBECONFIG = os.environ.get("K8S_KUBECONFIG", None)
262     K8S_APISERVER = os.environ.get("K8S_APISERVER", None)
263     K8S_TOKEN = os.environ.get("K8S_TOKEN", None)
264     return K8S_KUBECONFIG, K8S_APISERVER, K8S_TOKEN
265
266
267 def get_helm_cli():
268     return '/usr/local/bin/helm'
269
270
271 def get_containers_shared_folder():
272     return '/share'
273
274
275 def get_system_controller_as_respool():
276     return True
277
278
279 def gen_k8s_config_dict(cluster_api_endpoint, cluster_ca_cert, admin_user,
280                         admin_client_cert, admin_client_key):
281     # KUBECONFIG environment variable
282     # reference:
283     # https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/
284     data = {
285         'apiVersion': 'v1',
286         'clusters': [
287             {
288                 'cluster': {
289                     'server':
290                     cluster_api_endpoint,
291                     'certificate-authority-data':
292                     cluster_ca_cert,
293                 },
294                 'name': 'inf-cluster'
295             }],
296         'contexts': [
297             {
298                 'context': {
299                     'cluster': 'inf-cluster',
300                     'user': 'kubernetes-admin'
301                 },
302                 'name': 'kubernetes-admin@inf-cluster'
303             }
304         ],
305         'current-context': 'kubernetes-admin@inf-cluster',
306         'kind': 'Config',
307         'preferences': {},
308         'users': [
309             {
310                 'name': admin_user,
311                 'user': {
312                     'client-certificate-data':
313                     admin_client_cert,
314                     'client-key-data':
315                     admin_client_key,
316                 }
317             }]
318     }
319
320     return data
321
322
323 def get_helmcli_access():
324     host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
325     if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
326             config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
327         host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
328     host = "127.0.0.1" if host_external is None or host_external == '' \
329         else host_external
330     port = "10022" if host_external is None or host_external == '' \
331         else "30022"
332
333     helm_host_with_port = host+':'+port
334     helm_user = 'helm'
335     helm_pass = os.environ.get("HELM_USER_PASSWD")
336
337     return helm_host_with_port, helm_user, helm_pass
338
339
340 def get_alarm_yaml_filename():
341     alarm_yaml_name = os.environ.get("ALARM_YAML")
342     if alarm_yaml_name is not None and os.path.isfile(alarm_yaml_name):
343         return alarm_yaml_name
344     return "/configs/alarm.yaml"
345
346
347 def get_events_yaml_filename():
348     events_yaml_name = os.environ.get("EVENTS_YAML")
349     if events_yaml_name is not None and os.path.isfile(events_yaml_name):
350         return events_yaml_name
351     return "/configs/events.yaml"
352
353
354 # get k8s host from env:
355 def get_k8s_host():
356     k8s_host = os.environ.get("KUBERNETES_SERVICE_HOST")
357     if k8s_host is None:
358         raise Exception('Get k8s host failed.')
359     return k8s_host
360
361
362 # get k8s host port from env:
363 def get_k8s_port():
364     k8s_port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", '443')
365     return k8s_port
366
367
368 # token review url
369 def get_review_url():
370     try:
371         api = '/apis/authentication.k8s.io/v1/tokenreviews'
372         return "{0}{1}:{2}{3}".format(
373             'https://', get_k8s_host(), get_k8s_port(), api)
374     except Exception:
375         raise Exception('Get k8s review url failed')
376
377
378 # get reviewer token
379 def get_reviewer_token():
380     # token path default is below.
381     token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
382     with open(token_path, 'r') as f:
383         ctt = f.read()
384     return ctt
385
386
387 def get_auth_provider():
388     return 'k8s'
389
390
391 def get_dms_support_profiles():
392     profiles_list = []
393     profiles_str = config.conf.API.DMS_SUPPORT_PROFILES
394     if profiles_str:
395         profiles_strip = profiles_str.strip(' []')
396         profiles_str = profiles_strip.replace("'", "").replace(
397             '"', "")
398         profiles_list = profiles_str.split(',')
399     if 'native_k8sapi' not in profiles_list:
400         profiles_list.append('native_k8sapi')
401     return profiles_list