logger = o2logging.get_logger(__name__)
-_DEFAULT_DCMANAGER_URL = "http://192.168.204.1:8119/v1.0"
+CGTS_INSECURE_SSL = os.environ.get("CGTS_INSECURE_SSL", "0") == "1"
+
_DEFAULT_STX_URL = "http://192.168.204.1:5000/v3"
+_DCMANAGER_URL_PORT = os.environ.get("DCMANAGER_API_PORT", "8119")
+_DCMANAGER_URL_PATH = os.environ.get("DCMANAGER_API_PATH", "/v1.0")
def get_config_path():
return f"https://{host}:{port}"
+def get_stx_url():
+ try:
+ return get_stx_client_args()["auth_url"]
+ except KeyError:
+ logger.error('Please source your RC file before execution, '
+ 'e.g.: `source ~/downloads/admin-rc.sh`')
+ sys.exit(1)
+
+
+def get_dc_manager_url():
+ auth_url = os.environ.get("DCMANAGER_OS_AUTH_URL", None)
+ if auth_url is None:
+ temp_url = get_stx_url()
+ u = urlparse(temp_url)
+ u = u._replace(netloc=f"{u.hostname}:{_DCMANAGER_URL_PORT}")
+ u = u._replace(path=_DCMANAGER_URL_PATH)
+ auth_url = u.geturl()
+ return auth_url
+
+
def get_root_api_base():
return "/"
def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "",
sub_is_https: bool = False):
- # authurl = os.environ.get("STX_AUTH_URL", "http://192.168.204.1:5000/v3")
- # username = os.environ.get("STX_USERNAME", "admin")
- # pswd = os.environ.get("STX_PASSWORD", "passwd1")
- # stx_access_info = (authurl, username, pswd)
try:
- # client_args = dict(
- # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
- # username=os.environ.get('OS_USERNAME', "admin"),
- # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
- # project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
- # )
client_args = get_stx_client_args()
except KeyError:
logger.error('Please source your RC file before execution, '
if "" != subcloud_hostname:
if is_ipv6(subcloud_hostname):
subcloud_hostname = "[" + subcloud_hostname + "]"
- orig_auth_url = urlparse(_DEFAULT_STX_URL)
+ orig_auth_url = urlparse(get_stx_url())
new_auth_url = orig_auth_url._replace(
netloc=orig_auth_url.netloc.replace(
orig_auth_url.hostname, subcloud_hostname))
new_auth_url = new_auth_url._replace(
scheme=new_auth_url.scheme.
replace(new_auth_url.scheme, 'https'))
- os_client_args['insecure'] = True
+ os_client_args['insecure'] = CGTS_INSECURE_SSL
os_client_args['os_auth_url'] = new_auth_url.geturl()
os_client_args['os_endpoint_type'] = 'public'
# os_client_args['system_url'] = os_client_args['os_auth_url']
def get_dc_access_info():
try:
- # client_args = dict(
- # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
- # username=os.environ.get('OS_USERNAME', "admin"),
- # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
- # project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
- # )
client_args = get_stx_client_args()
except KeyError:
logger.error('Please source your RC file before execution, '
auth_url = urlparse(os_client_args.pop('os_auth_url'))
hostname = f"[{auth_url.hostname}]" if is_ipv6(auth_url.hostname) \
else auth_url.hostname
- dcmanager_url = urlparse(_DEFAULT_DCMANAGER_URL)
+ dcmanager_url = urlparse(get_dc_manager_url())
dcmanager_url = dcmanager_url._replace(netloc=dcmanager_url.netloc.replace(
dcmanager_url.hostname, hostname))
def get_fm_access_info(subcloud_hostname: str = "",
sub_is_https: bool = False):
try:
- # client_args = dict(
- # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
- # username=os.environ.get('OS_USERNAME', "admin"),
- # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
- # project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
- # )
client_args = get_stx_client_args()
except KeyError:
logger.error('Please source your RC file before execution, '
if "" != subcloud_hostname:
subcloud_hostname = f"[{subcloud_hostname}]" if \
is_ipv6(subcloud_hostname) else subcloud_hostname
- orig_auth_url = urlparse(_DEFAULT_STX_URL)
+ orig_auth_url = urlparse(get_stx_url())
new_auth_url = orig_auth_url._replace(
netloc=orig_auth_url.netloc.replace(
orig_auth_url.hostname, subcloud_hostname))
os_client_args['auth_url'] = new_auth_url.geturl()
os_client_args['endpoint_type'] = 'publicURL'
- os_client_args['insecure'] = True
+ os_client_args['insecure'] = CGTS_INSECURE_SSL
os_client_args['username'] = os_client_args.pop('os_username')
os_client_args['password'] = os_client_args.pop('os_api_key')