#!/usr/bin/env python ################################################################################ # Copyright 2021 highstreet technologies GmbH # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # importing the sys, json, requests library import os import sys import json import time import getpass import requests import warnings from jproperties import Properties from typing import List warnings.filterwarnings('ignore', message='Unverified HTTPS request') # global configurations def get_environment_variable(name): configs = Properties() env_file = os.path.dirname(os.path.abspath(__file__)) + '/' + '../' + '.env' with open(env_file, "rb") as read_prop: configs.load(read_prop) return configs.get(name).data def load_arguments(args: List[str]) -> tuple: realm_file = os.path.dirname(os.path.abspath( __file__)) + '/o-ran-sc-realm.json' auth_file = os.path.dirname(os.path.abspath( __file__)) + '/authentication.json' ready_timeout = 180 args.pop(0) while len(args) > 0: arg = args.pop(0) if arg == '--auth' and len(args) > 0: auth_file = args.pop(0) print('overwriting auth file: {}'.format(auth_file)) elif arg == '--realm' and len(args) > 0: realm_file = args.pop(0) print('overwriting realm file: {}'.format(realm_file)) elif arg == '--timeout' and len(args) > 0: ready_timeout = int(args.pop(0)) print('waiting for ready {} seconds'.format(ready_timeout)) return (realm_file, auth_file, ready_timeout) def isReady(timeoutSeconds=180): url = getBaseUrl() while timeoutSeconds > 0: try: response = requests.get(url, verify=False, headers={}) except: pass if response is not None and response.status_code == 200: return True time.sleep(1) timeoutSeconds -= 1 return False def getBaseUrl(): return get_environment_variable("IDENTITY_PROVIDER_URL") # Request a token for further communication def getToken(): url = base + '/realms/master/protocol/openid-connect/token' headers = { 'content-type': 'application/x-www-form-urlencoded', 'accept': 'application/json' } body = { 'client_id': 'admin-cli', 'grant_type': 'password', 'username': username, 'password': password } try: response = requests.post(url, verify=False, auth=( username, password), data=body, headers=headers) except requests.exceptions.Timeout: sys.exit('HTTP request failed, please check you internet connection.') except requests.exceptions.TooManyRedirects: sys.exit('HTTP request failed, please check your proxy settings.') except requests.exceptions.RequestException as e: # catastrophic error. bail. raise SystemExit(e) if response.status_code >= 200 and response.status_code < 300: print('Got token!') return response.json()['access_token'] else: sys.exit('Getting token failed.') # create the default realm from file def createRealm(token, realm): url = base + '/admin/realms' auth = 'bearer ' + token headers = { 'content-type': 'application/json', 'accept': 'application/json', 'authorization': auth } try: response = requests.post( url, verify=False, json=realm, headers=headers) except requests.exceptions.Timeout: sys.exit('HTTP request failed, please check you internet connection.') except requests.exceptions.TooManyRedirects: sys.exit('HTTP request failed, please check your proxy settings.') except requests.exceptions.RequestException as e: # catastrophic error. bail. raise SystemExit(e) return response.status_code >= 200 and response.status_code < 300 # Check if default realm exists def checkRealmExists(token, realmId): url = base + '/admin/realms/' + realmId auth = 'bearer ' + token headers = { 'accept': 'application/json', 'authorization': auth } try: response = requests.get(url, verify=False, headers=headers) except requests.exceptions.Timeout: sys.exit('HTTP request failed, please check you internet connection.') except requests.exceptions.TooManyRedirects: sys.exit('HTTP request failed, please check your proxy settings.') except requests.exceptions.RequestException as e: # catastrophic error. bail. raise SystemExit(e) if response.status_code >= 200 and response.status_code < 300: return realmId == response.json()['id'] else: # sys.exit('Getting realm failed.') return False # create a user in default realm def createUser(token, realmConfig, user): realmId = realmConfig['id'] url = base + '/admin/realms/' + realmId + '/users' auth = 'bearer ' + token headers = { 'accept': 'application/json', 'authorization': auth } try: response = requests.post(url, verify=False, json=user, headers=headers) except requests.exceptions.Timeout: sys.exit('HTTP request failed, please check you internet connection.') except requests.exceptions.TooManyRedirects: sys.exit('HTTP request failed, please check your proxy settings.') except requests.exceptions.RequestException as e: # catastrophic error. bail. raise SystemExit(e) if response.status_code >= 200 and response.status_code < 300: print('User', user['username'], 'created!') else: print('User creation', user['username'], 'failed!\n', response.text) # creates User accounts in realm based a file def createUsers(token, realmConfig, authConfig): for user in authConfig['users']: createUser(token, realmConfig, user) # create a user based on system user systemUser = { "firstName": getpass.getuser(), "lastName": "", "email": getpass.getuser() + "@sdnr.onap.org", "enabled": "true", "username": getpass.getuser(), "credentials": [ { "type": "password", "value": password, "temporary": False } ] } createUser(token, realmConfig, systemUser) # Grants a role to a user def addUserRole(user: dict, role: dict, options: dict): url = options['url'] + '/' + user['id'] + '/role-mappings/realm' try: response = requests.post(url, verify=False, json=[ {'id': role['id'], 'name':role['name']}], headers=options['headers']) except requests.exceptions.Timeout: sys.exit('HTTP request failed, please check you internet connection.') except requests.exceptions.TooManyRedirects: sys.exit('HTTP request failed, please check your proxy settings.') except requests.exceptions.RequestException as e: # catastrophic error. bail. raise SystemExit(e) if response.status_code >= 200 and response.status_code < 300: print('User role', user['username'], role['name'], 'created!') else: print('Creation of user role', user['username'], role['name'], 'failed!\n', response.text) # searches for the role of a given user def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict: roleName = 'administration' for grant in authConfig['grants']: if grant['username'] == username: roleName = grant['role'] for role in realmConfig['roles']['realm']: if role['name'] == roleName: return role return None # adds roles to users def addUserRoles(token, realmConfig, authConfig): realmId = realmConfig['id'] url = base + '/admin/realms/' + realmId + '/users' auth = 'bearer ' + token headers = { 'content-type': 'application/json', 'accept': 'application/json', 'authorization': auth } try: response = requests.get(url, verify=False, headers=headers) except requests.exceptions.Timeout: sys.exit('HTTP request failed, please check you internet connection.') except requests.exceptions.TooManyRedirects: sys.exit('HTTP request failed, please check your proxy settings.') except requests.exceptions.RequestException as e: # catastrophic error. bail. raise SystemExit(e) if response.status_code >= 200 and response.status_code < 300: users = response.json() options = { "url": url, "auth": auth, "headers": headers } for user in users: role = findRole(user['username'], authConfig, realmConfig) addUserRole(user, role, options) else: sys.exit('Getting users failed.') # main (realmFile, authFile, readyTimeout) = load_arguments(sys.argv) username = get_environment_variable('ADMIN_USERNAME') password = get_environment_variable('ADMIN_PASSWORD') base = getBaseUrl() isReady(readyTimeout) token = getToken() if token: with open(realmFile) as file: realmConfig = json.load(file) if not checkRealmExists(token, realmConfig['id']): createRealm(token, realmConfig) with open(authFile) as authConfig: authConfig = json.load(authConfig) createUsers(token, realmConfig, authConfig) addUserRoles(token, realmConfig, authConfig) exit(0) exit(1)