Merge "Support mTLS (Mutual TLS) with O2 API"
[pti/o2.git] / o2common / config / config.py
index c93d388..61c8c69 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2021-2022 Wind River Systems, Inc.
+# Copyright (C) 2021-2024 Wind River Systems, Inc.
 #
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -14,6 +14,7 @@
 
 import os
 import sys
+import ipaddress
 from urllib.parse import urlparse
 
 from o2common import config
@@ -117,6 +118,15 @@ def get_stx_client_args():
     return client_args
 
 
+def is_ipv6(address):
+    try:
+        # Try to convert the address and check the IP version
+        ip = ipaddress.ip_address(address)
+        return ip.version == 6
+    except ValueError:
+        return False
+
+
 def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "",
                         sub_is_https: bool = False):
     # authurl = os.environ.get("STX_AUTH_URL", "http://192.168.204.1:5000/v3")
@@ -140,6 +150,8 @@ def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "",
     for key, val in client_args.items():
         os_client_args['os_{key}'.format(key=key)] = val
     if "" != subcloud_hostname:
+        if is_ipv6(subcloud_hostname):
+            subcloud_hostname = "[" + subcloud_hostname + "]"
         orig_auth_url = urlparse(_DEFAULT_STX_URL)
         new_auth_url = orig_auth_url._replace(
             netloc=orig_auth_url.netloc.replace(
@@ -181,9 +193,11 @@ def get_dc_access_info():
     for key, val in client_args.items():
         os_client_args['os_{key}'.format(key=key)] = val
     auth_url = urlparse(os_client_args.pop('os_auth_url'))
+    hostname = f"[{auth_url.hostname}]" if is_ipv6(auth_url.hostname) \
+        else auth_url.hostname
     dcmanager_url = urlparse(_DEFAULT_DCMANAGER_URL)
     dcmanager_url = dcmanager_url._replace(netloc=dcmanager_url.netloc.replace(
-        dcmanager_url.hostname, auth_url.hostname))
+        dcmanager_url.hostname, hostname))
 
     os_client_args['dcmanager_url'] = dcmanager_url.geturl()
     os_client_args['auth_url'] = auth_url.geturl()
@@ -219,6 +233,8 @@ def get_fm_access_info(subcloud_hostname: str = "",
     os_client_args['auth_url'] = auth_url.geturl()
 
     if "" != subcloud_hostname:
+        subcloud_hostname = f"[{subcloud_hostname}]" if \
+            is_ipv6(subcloud_hostname) else subcloud_hostname
         orig_auth_url = urlparse(_DEFAULT_STX_URL)
         new_auth_url = orig_auth_url._replace(
             netloc=orig_auth_url.netloc.replace(