X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2common%2Fconfig%2Fconfig.py;h=f8eee94b0700064a285617a97d3f0e36aff261b3;hb=d7c14ad6506b2f1a85246c9e1d08d0d64e9df7f2;hp=f488b7108e0fd9616298a776b02e50972060ddd6;hpb=105b23b181c24df87bbc55cfceb906483c89917c;p=pti%2Fo2.git diff --git a/o2common/config/config.py b/o2common/config/config.py index f488b71..f8eee94 100644 --- a/o2common/config/config.py +++ b/o2common/config/config.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021-2022 Wind River Systems, Inc. +# Copyright (C) 2021-2024 Wind River Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ import os import sys +import ipaddress from urllib.parse import urlparse from o2common import config @@ -21,8 +22,11 @@ from o2common.helper import o2logging logger = o2logging.get_logger(__name__) -_DEFAULT_DCMANAGER_URL = "http://192.168.204.1:8119/v1.0" +CGTS_INSECURE_SSL = os.environ.get("CGTS_INSECURE_SSL", "0") == "1" + _DEFAULT_STX_URL = "http://192.168.204.1:5000/v3" +_DCMANAGER_URL_PORT = os.environ.get("DCMANAGER_API_PORT", "8119") +_DCMANAGER_URL_PATH = os.environ.get("DCMANAGER_API_PATH", "/v1.0") def get_config_path(): @@ -37,7 +41,7 @@ def get_smo_ca_config_path(): def get_postgres_uri(): host = os.environ.get("DB_HOST", "localhost") - port = 54321 if host == "localhost" else 5432 + port = int(os.environ.get("DB_PORT", 5432)) password = os.environ.get("DB_PASSWORD", "o2ims123") user, db_name = "o2ims", "o2ims" return f"postgresql://{user}:{password}@{host}:{port}/{db_name}" @@ -59,6 +63,26 @@ def get_api_url(): return f"https://{host}:{port}" +def get_stx_url(): + try: + return get_stx_client_args()["auth_url"] + except KeyError: + logger.error('Please source your RC file before execution, ' + 'e.g.: `source ~/downloads/admin-rc.sh`') + sys.exit(1) + + +def get_dc_manager_url(): + auth_url = os.environ.get("DCMANAGER_OS_AUTH_URL", None) + if auth_url is None: + temp_url = get_stx_url() + u = urlparse(temp_url) + u = u._replace(netloc=f"{u.hostname}:{_DCMANAGER_URL_PORT}") + u = u._replace(path=_DCMANAGER_URL_PATH) + auth_url = u.geturl() + return auth_url + + def get_root_api_base(): return "/" @@ -67,6 +91,14 @@ def get_o2ims_api_base(): return get_root_api_base() + 'o2ims-infrastructureInventory' +def get_o2ims_monitoring_api_v1(): + return '/v1' + + +def get_o2ims_inventory_api_v1(): + return '/v1' + + def get_o2ims_monitoring_api_base(): return get_root_api_base() + 'o2ims-infrastructureMonitoring' @@ -77,7 +109,7 @@ def get_o2dms_api_base(): def get_redis_host_and_port(): host = os.environ.get("REDIS_HOST", "localhost") - port = 63791 if host == "localhost" else 6379 + port = int(os.environ.get("REDIS_PORT", 6379)) return dict(host=host, port=port) @@ -109,19 +141,18 @@ def get_stx_client_args(): return client_args +def is_ipv6(address): + try: + # Try to convert the address and check the IP version + ip = ipaddress.ip_address(address) + return ip.version == 6 + except ValueError: + return False + + def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "", sub_is_https: bool = False): - # authurl = os.environ.get("STX_AUTH_URL", "http://192.168.204.1:5000/v3") - # username = os.environ.get("STX_USERNAME", "admin") - # pswd = os.environ.get("STX_PASSWORD", "passwd1") - # stx_access_info = (authurl, username, pswd) try: - # client_args = dict( - # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL), - # username=os.environ.get('OS_USERNAME', "admin"), - # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"), - # project_name=os.environ.get('OS_PROJECT_NAME', "admin"), - # ) client_args = get_stx_client_args() except KeyError: logger.error('Please source your RC file before execution, ' @@ -132,7 +163,9 @@ def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "", for key, val in client_args.items(): os_client_args['os_{key}'.format(key=key)] = val if "" != subcloud_hostname: - orig_auth_url = urlparse(_DEFAULT_STX_URL) + if is_ipv6(subcloud_hostname): + subcloud_hostname = "[" + subcloud_hostname + "]" + orig_auth_url = urlparse(get_stx_url()) new_auth_url = orig_auth_url._replace( netloc=orig_auth_url.netloc.replace( orig_auth_url.hostname, subcloud_hostname)) @@ -143,7 +176,7 @@ def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "", new_auth_url = new_auth_url._replace( scheme=new_auth_url.scheme. replace(new_auth_url.scheme, 'https')) - os_client_args['insecure'] = True + os_client_args['insecure'] = CGTS_INSECURE_SSL os_client_args['os_auth_url'] = new_auth_url.geturl() os_client_args['os_endpoint_type'] = 'public' # os_client_args['system_url'] = os_client_args['os_auth_url'] @@ -157,12 +190,6 @@ def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "", def get_dc_access_info(): try: - # client_args = dict( - # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL), - # username=os.environ.get('OS_USERNAME', "admin"), - # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"), - # project_name=os.environ.get('OS_PROJECT_NAME', "admin"), - # ) client_args = get_stx_client_args() except KeyError: logger.error('Please source your RC file before execution, ' @@ -173,9 +200,11 @@ def get_dc_access_info(): for key, val in client_args.items(): os_client_args['os_{key}'.format(key=key)] = val auth_url = urlparse(os_client_args.pop('os_auth_url')) - dcmanager_url = urlparse(_DEFAULT_DCMANAGER_URL) + hostname = f"[{auth_url.hostname}]" if is_ipv6(auth_url.hostname) \ + else auth_url.hostname + dcmanager_url = urlparse(get_dc_manager_url()) dcmanager_url = dcmanager_url._replace(netloc=dcmanager_url.netloc.replace( - dcmanager_url.hostname, auth_url.hostname)) + dcmanager_url.hostname, hostname)) os_client_args['dcmanager_url'] = dcmanager_url.geturl() os_client_args['auth_url'] = auth_url.geturl() @@ -188,14 +217,9 @@ def get_dc_access_info(): return os_client_args -def get_fm_access_info(): +def get_fm_access_info(subcloud_hostname: str = "", + sub_is_https: bool = False): try: - # client_args = dict( - # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL), - # username=os.environ.get('OS_USERNAME', "admin"), - # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"), - # project_name=os.environ.get('OS_PROJECT_NAME', "admin"), - # ) client_args = get_stx_client_args() except KeyError: logger.error('Please source your RC file before execution, ' @@ -205,11 +229,26 @@ def get_fm_access_info(): os_client_args = {} for key, val in client_args.items(): os_client_args['os_{key}'.format(key=key)] = val + auth_url = urlparse(os_client_args.pop('os_auth_url')) + os_client_args['auth_url'] = auth_url.geturl() - os_client_args['insecure'] = True + if "" != subcloud_hostname: + subcloud_hostname = f"[{subcloud_hostname}]" if \ + is_ipv6(subcloud_hostname) else subcloud_hostname + orig_auth_url = urlparse(get_stx_url()) + new_auth_url = orig_auth_url._replace( + netloc=orig_auth_url.netloc.replace( + orig_auth_url.hostname, subcloud_hostname)) + if sub_is_https: + new_auth_url = new_auth_url._replace( + scheme=new_auth_url.scheme. + replace(new_auth_url.scheme, 'https')) + os_client_args['auth_url'] = new_auth_url.geturl() + os_client_args['endpoint_type'] = 'publicURL' + + os_client_args['insecure'] = CGTS_INSECURE_SSL - os_client_args['auth_url'] = auth_url.geturl() os_client_args['username'] = os_client_args.pop('os_username') os_client_args['password'] = os_client_args.pop('os_api_key') os_client_args['project_name'] = os_client_args.pop('os_project_name') @@ -230,6 +269,10 @@ def get_helm_cli(): return '/usr/local/bin/helm' +def get_containers_shared_folder(): + return '/share' + + def get_system_controller_as_respool(): return True @@ -343,17 +386,17 @@ def get_reviewer_token(): def get_auth_provider(): - return 'k8s' + return config.conf.auth_provider def get_dms_support_profiles(): - profiles = config.conf.API.DMS_SUPPORT_PROFILES - if profiles is None or profiles == '': - profiles = [] - elif "[" in profiles and "]" in profiles: - profiles = profiles.replace("'", "").replace( - '"', "").replace('[', "").replace(']', "") - profiles = profiles.split(',') - if 'native_k8sapi' not in profiles: - profiles.append('native_k8sapi') - return profiles + profiles_list = [] + profiles_str = config.conf.API.DMS_SUPPORT_PROFILES + if profiles_str: + profiles_strip = profiles_str.strip(' []') + profiles_str = profiles_strip.replace("'", "").replace( + '"', "") + profiles_list = profiles_str.split(',') + if 'native_k8sapi' not in profiles_list: + profiles_list.append('native_k8sapi') + return profiles_list