X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=solution%2Fsmo%2Fcommon%2Fidentity%2Fconfig.py;h=5a8bf44defe8c14ee6e235b212aa3c55be6df8e6;hb=refs%2Fchanges%2F05%2F11505%2F2;hp=393d9189f26fa40eeb757be2f7ba3797a85704a2;hpb=abe80e0619c67eef33bf16927d183a5eb9d6db1b;p=oam.git diff --git a/solution/smo/common/identity/config.py b/solution/smo/common/identity/config.py index 393d918..5a8bf44 100644 --- a/solution/smo/common/identity/config.py +++ b/solution/smo/common/identity/config.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -################################################################################ -# Copyright 2021 highstreet technologies GmbH +############################################################################# +# Copyright 2023 highstreet technologies GmbH # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import json import time import getpass import requests +import re import warnings from jproperties import Properties from typing import List @@ -32,11 +33,21 @@ warnings.filterwarnings('ignore', message='Unverified HTTPS request') def get_environment_variable(name): configs = Properties() - path = pathlib.Path( os.path.dirname(os.path.abspath(__file__)) ) + path = pathlib.Path(os.path.dirname(os.path.abspath(__file__))) env_file = str(path.parent.absolute()) + '/.env' with open(env_file, "rb") as read_prop: configs.load(read_prop) - return configs.get(name).data + value = configs.get(name).data + + regex = r"\$\{([^\}]+)\}" + matches = re.finditer(regex, value) + while True: + match = next(matches, None) + if match is None: + break + inner = get_environment_variable(match.group(1)) + value = value.replace("${" + match.group(1) + "}", inner ) + return value def load_arguments(args: List[str]) -> tuple: @@ -62,12 +73,13 @@ def load_arguments(args: List[str]) -> tuple: def isReady(timeoutSeconds=180): - url = getBaseUrl(); + url = getBaseUrl() + print(f'url={url}') while timeoutSeconds > 0: try: response = requests.get(url, verify=False, headers={}) except: - pass + response = None if response is not None and response.status_code == 200: return True time.sleep(1) @@ -79,6 +91,8 @@ def getBaseUrl(): return get_environment_variable("IDENTITY_PROVIDER_URL") # Request a token for further communication + + def getToken(): url = base + '/realms/master/protocol/openid-connect/token' headers = { @@ -204,10 +218,10 @@ def createUsers(token, realmConfig, authConfig): "value": password, "temporary": True } - ], - "requiredActions": [ - "UPDATE_PASSWORD" - ] + ], + "requiredActions": [ + "UPDATE_PASSWORD" + ] } createUser(token, realmConfig, systemUser) @@ -218,7 +232,8 @@ def addUserRole(user: dict, role: dict, options: dict): url = options['url'] + '/' + user['id'] + '/role-mappings/realm' try: response = requests.post(url, verify=False, json=[ - {'id': role['id'], 'name':role['name']}], headers=options['headers']) + {'id': role['id'], 'name':role['name']}], + headers=options['headers']) except requests.exceptions.Timeout: sys.exit('HTTP request failed, please check you internet connection.') except requests.exceptions.TooManyRedirects: @@ -235,6 +250,7 @@ def addUserRole(user: dict, role: dict, options: dict): # searches for the role of a given user + def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict: roleName = 'administration' for grant in authConfig['grants']: