-# Copyright (C) 2021-2022 Wind River Systems, Inc.
+# Copyright (C) 2021-2024 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
import os
import sys
+import ipaddress
from urllib.parse import urlparse
from o2common import config
def get_postgres_uri():
host = os.environ.get("DB_HOST", "localhost")
- port = 54321 if host == "localhost" else 5432
+ port = int(os.environ.get("DB_PORT", 5432))
password = os.environ.get("DB_PASSWORD", "o2ims123")
user, db_name = "o2ims", "o2ims"
return f"postgresql://{user}:{password}@{host}:{port}/{db_name}"
return get_root_api_base() + 'o2ims-infrastructureInventory'
+def get_o2ims_monitoring_api_v1():
+ return '/v1'
+
+
+def get_o2ims_inventory_api_v1():
+ return '/v1'
+
+
def get_o2ims_monitoring_api_base():
return get_root_api_base() + 'o2ims-infrastructureMonitoring'
def get_redis_host_and_port():
host = os.environ.get("REDIS_HOST", "localhost")
- port = 63791 if host == "localhost" else 6379
+ port = int(os.environ.get("REDIS_PORT", 6379))
return dict(host=host, port=port)
return client_args
+def is_ipv6(address):
+ try:
+ # Try to convert the address and check the IP version
+ ip = ipaddress.ip_address(address)
+ return ip.version == 6
+ except ValueError:
+ return False
+
+
def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "",
sub_is_https: bool = False):
# authurl = os.environ.get("STX_AUTH_URL", "http://192.168.204.1:5000/v3")
for key, val in client_args.items():
os_client_args['os_{key}'.format(key=key)] = val
if "" != subcloud_hostname:
+ if is_ipv6(subcloud_hostname):
+ subcloud_hostname = "[" + subcloud_hostname + "]"
orig_auth_url = urlparse(_DEFAULT_STX_URL)
new_auth_url = orig_auth_url._replace(
netloc=orig_auth_url.netloc.replace(
for key, val in client_args.items():
os_client_args['os_{key}'.format(key=key)] = val
auth_url = urlparse(os_client_args.pop('os_auth_url'))
+ hostname = f"[{auth_url.hostname}]" if is_ipv6(auth_url.hostname) \
+ else auth_url.hostname
dcmanager_url = urlparse(_DEFAULT_DCMANAGER_URL)
dcmanager_url = dcmanager_url._replace(netloc=dcmanager_url.netloc.replace(
- dcmanager_url.hostname, auth_url.hostname))
+ dcmanager_url.hostname, hostname))
os_client_args['dcmanager_url'] = dcmanager_url.geturl()
os_client_args['auth_url'] = auth_url.geturl()
os_client_args['auth_url'] = auth_url.geturl()
if "" != subcloud_hostname:
+ subcloud_hostname = f"[{subcloud_hostname}]" if \
+ is_ipv6(subcloud_hostname) else subcloud_hostname
orig_auth_url = urlparse(_DEFAULT_STX_URL)
new_auth_url = orig_auth_url._replace(
netloc=orig_auth_url.netloc.replace(
return '/usr/local/bin/helm'
+def get_containers_shared_folder():
+ return '/share'
+
+
def get_system_controller_as_respool():
return True
def get_dms_support_profiles():
- profiles = config.conf.API.DMS_SUPPORT_PROFILES
- if profiles is None or profiles == '':
- profiles = []
- elif "[" in profiles and "]" in profiles:
- profiles = profiles.replace("'", "").replace(
- '"', "").replace('[', "").replace(']', "")
- profiles = profiles.split(',')
- if 'native_k8sapi' not in profiles:
- profiles.append('native_k8sapi')
- return profiles
+ profiles_list = []
+ profiles_str = config.conf.API.DMS_SUPPORT_PROFILES
+ if profiles_str:
+ profiles_strip = profiles_str.strip(' []')
+ profiles_str = profiles_strip.replace("'", "").replace(
+ '"', "")
+ profiles_list = profiles_str.split(',')
+ if 'native_k8sapi' not in profiles_list:
+ profiles_list.append('native_k8sapi')
+ return profiles_list