X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2common%2Fconfig%2Fconfig.py;h=2ad1da363de4227b2d9873ec965f5ebcde87e528;hb=HEAD;hp=61c8c692d9432dcfb3b2e4c0da8cb27b8cd1c3c4;hpb=b533c03e9233171cbd114339f98bd428688998a7;p=pti%2Fo2.git diff --git a/o2common/config/config.py b/o2common/config/config.py index 61c8c69..2ad1da3 100644 --- a/o2common/config/config.py +++ b/o2common/config/config.py @@ -22,8 +22,11 @@ from o2common.helper import o2logging logger = o2logging.get_logger(__name__) -_DEFAULT_DCMANAGER_URL = "http://192.168.204.1:8119/v1.0" +CGTS_INSECURE_SSL = os.environ.get("CGTS_INSECURE_SSL", "0") == "1" + _DEFAULT_STX_URL = "http://192.168.204.1:5000/v3" +_DCMANAGER_URL_PORT = os.environ.get("DCMANAGER_API_PORT", "8119") +_DCMANAGER_URL_PATH = os.environ.get("DCMANAGER_API_PATH", "/v1.0") def get_config_path(): @@ -60,6 +63,31 @@ def get_api_url(): return f"https://{host}:{port}" +def get_region_name(): + region_name = os.environ.get("OS_REGION_NAME", "RegionOne") + return region_name + + +def get_stx_url(): + try: + return get_stx_client_args()["auth_url"] + except KeyError: + logger.error('Please source your RC file before execution, ' + 'e.g.: `source ~/downloads/admin-rc.sh`') + sys.exit(1) + + +def get_dc_manager_url(): + auth_url = os.environ.get("DCMANAGER_OS_AUTH_URL", None) + if auth_url is None: + temp_url = get_stx_url() + u = urlparse(temp_url) + u = u._replace(netloc=f"{u.hostname}:{_DCMANAGER_URL_PORT}") + u = u._replace(path=_DCMANAGER_URL_PATH) + auth_url = u.geturl() + return auth_url + + def get_root_api_base(): return "/" @@ -127,19 +155,10 @@ def is_ipv6(address): return False -def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "", +def get_stx_access_info(region_name=get_region_name(), + subcloud_hostname: str = "", sub_is_https: bool = False): - # authurl = os.environ.get("STX_AUTH_URL", "http://192.168.204.1:5000/v3") - # username = os.environ.get("STX_USERNAME", "admin") - # pswd = os.environ.get("STX_PASSWORD", "passwd1") - # stx_access_info = (authurl, username, pswd) try: - # client_args = dict( - # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL), - # username=os.environ.get('OS_USERNAME', "admin"), - # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"), - # project_name=os.environ.get('OS_PROJECT_NAME', "admin"), - # ) client_args = get_stx_client_args() except KeyError: logger.error('Please source your RC file before execution, ' @@ -152,7 +171,7 @@ def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "", if "" != subcloud_hostname: if is_ipv6(subcloud_hostname): subcloud_hostname = "[" + subcloud_hostname + "]" - orig_auth_url = urlparse(_DEFAULT_STX_URL) + orig_auth_url = urlparse(get_stx_url()) new_auth_url = orig_auth_url._replace( netloc=orig_auth_url.netloc.replace( orig_auth_url.hostname, subcloud_hostname)) @@ -163,7 +182,7 @@ def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "", new_auth_url = new_auth_url._replace( scheme=new_auth_url.scheme. replace(new_auth_url.scheme, 'https')) - os_client_args['insecure'] = True + os_client_args['insecure'] = CGTS_INSECURE_SSL os_client_args['os_auth_url'] = new_auth_url.geturl() os_client_args['os_endpoint_type'] = 'public' # os_client_args['system_url'] = os_client_args['os_auth_url'] @@ -177,12 +196,6 @@ def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "", def get_dc_access_info(): try: - # client_args = dict( - # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL), - # username=os.environ.get('OS_USERNAME', "admin"), - # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"), - # project_name=os.environ.get('OS_PROJECT_NAME', "admin"), - # ) client_args = get_stx_client_args() except KeyError: logger.error('Please source your RC file before execution, ' @@ -195,7 +208,7 @@ def get_dc_access_info(): auth_url = urlparse(os_client_args.pop('os_auth_url')) hostname = f"[{auth_url.hostname}]" if is_ipv6(auth_url.hostname) \ else auth_url.hostname - dcmanager_url = urlparse(_DEFAULT_DCMANAGER_URL) + dcmanager_url = urlparse(get_dc_manager_url()) dcmanager_url = dcmanager_url._replace(netloc=dcmanager_url.netloc.replace( dcmanager_url.hostname, hostname)) @@ -213,12 +226,6 @@ def get_dc_access_info(): def get_fm_access_info(subcloud_hostname: str = "", sub_is_https: bool = False): try: - # client_args = dict( - # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL), - # username=os.environ.get('OS_USERNAME', "admin"), - # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"), - # project_name=os.environ.get('OS_PROJECT_NAME', "admin"), - # ) client_args = get_stx_client_args() except KeyError: logger.error('Please source your RC file before execution, ' @@ -235,7 +242,7 @@ def get_fm_access_info(subcloud_hostname: str = "", if "" != subcloud_hostname: subcloud_hostname = f"[{subcloud_hostname}]" if \ is_ipv6(subcloud_hostname) else subcloud_hostname - orig_auth_url = urlparse(_DEFAULT_STX_URL) + orig_auth_url = urlparse(get_stx_url()) new_auth_url = orig_auth_url._replace( netloc=orig_auth_url.netloc.replace( orig_auth_url.hostname, subcloud_hostname)) @@ -246,7 +253,7 @@ def get_fm_access_info(subcloud_hostname: str = "", os_client_args['auth_url'] = new_auth_url.geturl() os_client_args['endpoint_type'] = 'publicURL' - os_client_args['insecure'] = True + os_client_args['insecure'] = CGTS_INSECURE_SSL os_client_args['username'] = os_client_args.pop('os_username') os_client_args['password'] = os_client_args.pop('os_api_key') @@ -385,7 +392,7 @@ def get_reviewer_token(): def get_auth_provider(): - return 'k8s' + return config.conf.auth_provider def get_dms_support_profiles():