X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=solution%2Fintegration%2Fsmo%2Fcommon%2Fidentity%2Fconfig.py;fp=solution%2Fintegration%2Fsmo%2Fcommon%2Fidentity%2Fconfig.py;h=0000000000000000000000000000000000000000;hb=990de7205645f827a36429006f7e5529b5fb6866;hp=1ff53accb15f63b4a2ce3f270f773def659fa227;hpb=a3be9ade3bec5ca8cd5b41032704872f87d8c065;p=oam.git diff --git a/solution/integration/smo/common/identity/config.py b/solution/integration/smo/common/identity/config.py deleted file mode 100644 index 1ff53ac..0000000 --- a/solution/integration/smo/common/identity/config.py +++ /dev/null @@ -1,299 +0,0 @@ -#!/usr/bin/env python -################################################################################ -# Copyright 2021 highstreet technologies GmbH -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# importing the sys, json, requests library -import os -import sys -import json -import time -import getpass -import requests -import warnings -from jproperties import Properties -from typing import List -warnings.filterwarnings('ignore', message='Unverified HTTPS request') -# global configurations - - -def get_environment_variable(name): - configs = Properties() - env_file = os.path.dirname(os.path.abspath(__file__)) + '/' + '../' + '.env' - with open(env_file, "rb") as read_prop: - configs.load(read_prop) - return configs.get(name).data - - -def load_arguments(args: List[str]) -> tuple: - realm_file = os.path.dirname(os.path.abspath( - __file__)) + '/o-ran-sc-realm.json' - auth_file = os.path.dirname(os.path.abspath( - __file__)) + '/authentication.json' - ready_timeout = 180 - args.pop(0) - while len(args) > 0: - arg = args.pop(0) - if arg == '--auth' and len(args) > 0: - auth_file = args.pop(0) - print('overwriting auth file: {}'.format(auth_file)) - elif arg == '--realm' and len(args) > 0: - realm_file = args.pop(0) - print('overwriting realm file: {}'.format(realm_file)) - elif arg == '--timeout' and len(args) > 0: - ready_timeout = int(args.pop(0)) - print('waiting for ready {} seconds'.format(ready_timeout)) - - return (realm_file, auth_file, ready_timeout) - - -def isReady(timeoutSeconds=180): - url = getBaseUrl() - while timeoutSeconds > 0: - try: - response = requests.get(url, verify=False, headers={}) - except: - pass - if response is not None and response.status_code == 200: - return True - time.sleep(1) - timeoutSeconds -= 1 - return False - - -def getBaseUrl(): - return get_environment_variable("IDENTITY_PROVIDER_URL") - -# Request a token for further communication -def getToken(): - url = base + '/realms/master/protocol/openid-connect/token' - headers = { - 'content-type': 'application/x-www-form-urlencoded', - 'accept': 'application/json' - } - body = { - 'client_id': 'admin-cli', - 'grant_type': 'password', - 'username': username, - 'password': password - } - try: - response = requests.post(url, verify=False, auth=( - username, password), data=body, headers=headers) - except requests.exceptions.Timeout: - sys.exit('HTTP request failed, please check you internet connection.') - except requests.exceptions.TooManyRedirects: - sys.exit('HTTP request failed, please check your proxy settings.') - except requests.exceptions.RequestException as e: - # catastrophic error. bail. - raise SystemExit(e) - - if response.status_code >= 200 and response.status_code < 300: - print('Got token!') - return response.json()['access_token'] - else: - sys.exit('Getting token failed.') - -# create the default realm from file - - -def createRealm(token, realm): - url = base + '/admin/realms' - auth = 'bearer ' + token - headers = { - 'content-type': 'application/json', - 'accept': 'application/json', - 'authorization': auth - } - try: - response = requests.post( - url, verify=False, json=realm, headers=headers) - except requests.exceptions.Timeout: - sys.exit('HTTP request failed, please check you internet connection.') - except requests.exceptions.TooManyRedirects: - sys.exit('HTTP request failed, please check your proxy settings.') - except requests.exceptions.RequestException as e: - # catastrophic error. bail. - raise SystemExit(e) - - return response.status_code >= 200 and response.status_code < 300 - -# Check if default realm exists - - -def checkRealmExists(token, realmId): - url = base + '/admin/realms/' + realmId - auth = 'bearer ' + token - headers = { - 'accept': 'application/json', - 'authorization': auth - } - try: - response = requests.get(url, verify=False, headers=headers) - except requests.exceptions.Timeout: - sys.exit('HTTP request failed, please check you internet connection.') - except requests.exceptions.TooManyRedirects: - sys.exit('HTTP request failed, please check your proxy settings.') - except requests.exceptions.RequestException as e: - # catastrophic error. bail. - raise SystemExit(e) - - if response.status_code >= 200 and response.status_code < 300: - return realmId == response.json()['id'] - else: - # sys.exit('Getting realm failed.') - return False - -# create a user in default realm - - -def createUser(token, realmConfig, user): - realmId = realmConfig['id'] - url = base + '/admin/realms/' + realmId + '/users' - auth = 'bearer ' + token - headers = { - 'accept': 'application/json', - 'authorization': auth - } - try: - response = requests.post(url, verify=False, json=user, headers=headers) - except requests.exceptions.Timeout: - sys.exit('HTTP request failed, please check you internet connection.') - except requests.exceptions.TooManyRedirects: - sys.exit('HTTP request failed, please check your proxy settings.') - except requests.exceptions.RequestException as e: - # catastrophic error. bail. - raise SystemExit(e) - - if response.status_code >= 200 and response.status_code < 300: - print('User', user['username'], 'created!') - else: - print('User creation', user['username'], 'failed!\n', response.text) - -# creates User accounts in realm based a file - - -def createUsers(token, realmConfig, authConfig): - for user in authConfig['users']: - createUser(token, realmConfig, user) - - # create a user based on system user - systemUser = { - "firstName": getpass.getuser(), - "lastName": "", - "email": getpass.getuser() + "@sdnr.onap.org", - "enabled": "true", - "username": getpass.getuser(), - "credentials": [ - { - "type": "password", - "value": password, - "temporary": False - } - ] - } - createUser(token, realmConfig, systemUser) - -# Grants a role to a user - - -def addUserRole(user: dict, role: dict, options: dict): - url = options['url'] + '/' + user['id'] + '/role-mappings/realm' - try: - response = requests.post(url, verify=False, json=[ - {'id': role['id'], 'name':role['name']}], headers=options['headers']) - except requests.exceptions.Timeout: - sys.exit('HTTP request failed, please check you internet connection.') - except requests.exceptions.TooManyRedirects: - sys.exit('HTTP request failed, please check your proxy settings.') - except requests.exceptions.RequestException as e: - # catastrophic error. bail. - raise SystemExit(e) - - if response.status_code >= 200 and response.status_code < 300: - print('User role', user['username'], role['name'], 'created!') - else: - print('Creation of user role', - user['username'], role['name'], 'failed!\n', response.text) - -# searches for the role of a given user - - -def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict: - roleName = 'administration' - for grant in authConfig['grants']: - if grant['username'] == username: - roleName = grant['role'] - for role in realmConfig['roles']['realm']: - if role['name'] == roleName: - return role - return None - -# adds roles to users - - -def addUserRoles(token, realmConfig, authConfig): - realmId = realmConfig['id'] - url = base + '/admin/realms/' + realmId + '/users' - auth = 'bearer ' + token - headers = { - 'content-type': 'application/json', - 'accept': 'application/json', - 'authorization': auth - } - try: - response = requests.get(url, verify=False, headers=headers) - except requests.exceptions.Timeout: - sys.exit('HTTP request failed, please check you internet connection.') - except requests.exceptions.TooManyRedirects: - sys.exit('HTTP request failed, please check your proxy settings.') - except requests.exceptions.RequestException as e: - # catastrophic error. bail. - raise SystemExit(e) - - if response.status_code >= 200 and response.status_code < 300: - users = response.json() - options = { - "url": url, - "auth": auth, - "headers": headers - } - for user in users: - role = findRole(user['username'], authConfig, realmConfig) - addUserRole(user, role, options) - else: - sys.exit('Getting users failed.') - -# main - - -(realmFile, authFile, readyTimeout) = load_arguments(sys.argv) -username = get_environment_variable('ADMIN_USERNAME') -password = get_environment_variable('ADMIN_PASSWORD') -base = getBaseUrl() -isReady(readyTimeout) -token = getToken() -if token: - with open(realmFile) as file: - realmConfig = json.load(file) - if not checkRealmExists(token, realmConfig['id']): - createRealm(token, realmConfig) - - with open(authFile) as authConfig: - authConfig = json.load(authConfig) - createUsers(token, realmConfig, authConfig) - addUserRoles(token, realmConfig, authConfig) - exit(0) -exit(1)