import os
import sys
import json
-import requests
+import time
import getpass
-
+import requests
+import warnings
+from jproperties import Properties
+from typing import List
+warnings.filterwarnings('ignore', message='Unverified HTTPS request')
# global configurations
-# TODO: read from ../.env
-base = 'https://identity:8463'
-username = 'admin'
-password = 'Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U'
-realmFile = os.path.dirname(os.path.abspath(__file__)) + '/o-ran-sc-realm.json'
-authFile = os.path.dirname(os.path.abspath(__file__)) + '/authentication.json'
-
-# Request a token for futher communication
+
+
+def get_environment_variable(name):
+ configs = Properties()
+ env_file = os.path.dirname(os.path.abspath(__file__)) + '/' + '../' + '.env'
+ with open(env_file, "rb") as read_prop:
+ configs.load(read_prop)
+ return configs.get(name).data
+
+
+def load_arguments(args: List[str]) -> tuple:
+ realm_file = os.path.dirname(os.path.abspath(
+ __file__)) + '/o-ran-sc-realm.json'
+ auth_file = os.path.dirname(os.path.abspath(
+ __file__)) + '/authentication.json'
+ ready_timeout = 180
+ args.pop(0)
+ while len(args) > 0:
+ arg = args.pop(0)
+ if arg == '--auth' and len(args) > 0:
+ auth_file = args.pop(0)
+ print('overwriting auth file: {}'.format(auth_file))
+ elif arg == '--realm' and len(args) > 0:
+ realm_file = args.pop(0)
+ print('overwriting realm file: {}'.format(realm_file))
+ elif arg == '--timeout' and len(args) > 0:
+ ready_timeout = int(args.pop(0))
+ print('waiting for ready {} seconds'.format(ready_timeout))
+
+ return (realm_file, auth_file, ready_timeout)
+
+
+def isReady(timeoutSeconds=180):
+ url = getBaseUrl()
+ while timeoutSeconds > 0:
+ try:
+ response = requests.get(url, verify=False, headers={})
+ except:
+ pass
+ if response is not None and response.status_code == 200:
+ return True
+ time.sleep(1)
+ timeoutSeconds -= 1
+ return False
+
+
+def getBaseUrl():
+ return get_environment_variable("IDENTITY_PROVIDER_URL")
+
+# Request a token for further communication
def getToken():
- url = base + '/auth/realms/master/protocol/openid-connect/token'
+ url = base + '/realms/master/protocol/openid-connect/token'
headers = {
'content-type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
}
body = {
- 'client_id':'admin-cli',
- 'grant_type': 'password',
- 'username': username,
- 'password': password
+ 'client_id': 'admin-cli',
+ 'grant_type': 'password',
+ 'username': username,
+ 'password': password
}
try:
- response = requests.post(url, verify=False, auth=(username, password), data=body, headers=headers)
+ response = requests.post(url, verify=False, auth=(
+ username, password), data=body, headers=headers)
except requests.exceptions.Timeout:
- sys.exit('HTTP request failed, please check you internet connection.')
+ sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
- sys.exit('HTTP request failed, please check your proxy settings.')
+ sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
- # catastrophic error. bail.
- raise SystemExit(e)
+ # catastrophic error. bail.
+ raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
- print('Got token!')
- return response.json()['access_token']
+ print('Got token!')
+ return response.json()['access_token']
else:
- sys.exit('Getting token failed.')
+ sys.exit('Getting token failed.')
# create the default realm from file
+
+
def createRealm(token, realm):
- url = base + '/auth/admin/realms'
- auth = 'bearer ' + token
- headers = {
- 'content-type': 'application/json',
- 'accept': 'application/json',
- 'authorization': auth
- }
- try:
- response = requests.post(url, verify=False, json=realm, headers=headers)
- except requests.exceptions.Timeout:
- sys.exit('HTTP request failed, please check you internet connection.')
- except requests.exceptions.TooManyRedirects:
- sys.exit('HTTP request failed, please check your proxy settings.')
- except requests.exceptions.RequestException as e:
- # catastrophic error. bail.
- raise SystemExit(e)
-
- return response.status_code >= 200 and response.status_code < 300
+ url = base + '/admin/realms'
+ auth = 'bearer ' + token
+ headers = {
+ 'content-type': 'application/json',
+ 'accept': 'application/json',
+ 'authorization': auth
+ }
+ try:
+ response = requests.post(
+ url, verify=False, json=realm, headers=headers)
+ except requests.exceptions.Timeout:
+ sys.exit('HTTP request failed, please check you internet connection.')
+ except requests.exceptions.TooManyRedirects:
+ sys.exit('HTTP request failed, please check your proxy settings.')
+ except requests.exceptions.RequestException as e:
+ # catastrophic error. bail.
+ raise SystemExit(e)
+
+ return response.status_code >= 200 and response.status_code < 300
# Check if default realm exists
+
+
def checkRealmExists(token, realmId):
- url = base + '/auth/admin/realms/' + realmId
- auth = 'bearer ' + token
- headers = {
- 'accept': 'application/json',
- 'authorization': auth
- }
- try:
- response = requests.get(url, verify=False, headers=headers)
- except requests.exceptions.Timeout:
- sys.exit('HTTP request failed, please check you internet connection.')
- except requests.exceptions.TooManyRedirects:
- sys.exit('HTTP request failed, please check your proxy settings.')
- except requests.exceptions.RequestException as e:
- # catastrophic error. bail.
- raise SystemExit(e)
-
- if response.status_code >= 200 and response.status_code < 300:
- return realmId == response.json()['id']
- else:
- # sys.exit('Getting realm failed.')
- return False
+ url = base + '/admin/realms/' + realmId
+ auth = 'bearer ' + token
+ headers = {
+ 'accept': 'application/json',
+ 'authorization': auth
+ }
+ try:
+ response = requests.get(url, verify=False, headers=headers)
+ except requests.exceptions.Timeout:
+ sys.exit('HTTP request failed, please check you internet connection.')
+ except requests.exceptions.TooManyRedirects:
+ sys.exit('HTTP request failed, please check your proxy settings.')
+ except requests.exceptions.RequestException as e:
+ # catastrophic error. bail.
+ raise SystemExit(e)
+
+ if response.status_code >= 200 and response.status_code < 300:
+ return realmId == response.json()['id']
+ else:
+ # sys.exit('Getting realm failed.')
+ return False
# create a user in default realm
-def createUser(token, realmId, user):
- url = base + '/auth/admin/realms/' + realmId + '/users'
- auth = 'bearer ' + token
- headers = {
- 'accept': 'application/json',
- 'authorization': auth
- }
- try:
- response = requests.post(url, verify=False, json=user, headers=headers)
- except requests.exceptions.Timeout:
- sys.exit('HTTP request failed, please check you internet connection.')
- except requests.exceptions.TooManyRedirects:
- sys.exit('HTTP request failed, please check your proxy settings.')
- except requests.exceptions.RequestException as e:
- # catastrophic error. bail.
- raise SystemExit(e)
-
- if response.status_code >= 200 and response.status_code < 300:
- print('User', user['username'], 'created!')
- else:
- print('User creation', user['username'], 'failed!\n', response.text)
+
+
+def createUser(token, realmConfig, user):
+ realmId = realmConfig['id']
+ url = base + '/admin/realms/' + realmId + '/users'
+ auth = 'bearer ' + token
+ headers = {
+ 'accept': 'application/json',
+ 'authorization': auth
+ }
+ try:
+ response = requests.post(url, verify=False, json=user, headers=headers)
+ except requests.exceptions.Timeout:
+ sys.exit('HTTP request failed, please check you internet connection.')
+ except requests.exceptions.TooManyRedirects:
+ sys.exit('HTTP request failed, please check your proxy settings.')
+ except requests.exceptions.RequestException as e:
+ # catastrophic error. bail.
+ raise SystemExit(e)
+
+ if response.status_code >= 200 and response.status_code < 300:
+ print('User', user['username'], 'created!')
+ else:
+ print('User creation', user['username'], 'failed!\n', response.text)
# creates User accounts in realm based a file
-def createUsers(token, realm, authConfig):
- for user in authConfig['users']:
- createUser(token, realm, user)
-
- # create a user based on system user
- systemUser = {
- "firstName": getpass.getuser(),
- "lastName": "",
- "email": getpass.getuser() + "@sdnr.onap.org",
- "enabled": "true",
- "username": getpass.getuser(),
- "credentials": [
- {
- "type": "password",
- "value": password,
- "temporary": False
- }
- ]
- }
- createUser(token, realm, systemUser)
+
+
+def createUsers(token, realmConfig, authConfig):
+ for user in authConfig['users']:
+ createUser(token, realmConfig, user)
+
+ # create a user based on system user
+ systemUser = {
+ "firstName": getpass.getuser(),
+ "lastName": "",
+ "email": getpass.getuser() + "@sdnr.onap.org",
+ "enabled": "true",
+ "username": getpass.getuser(),
+ "credentials": [
+ {
+ "type": "password",
+ "value": password,
+ "temporary": False
+ }
+ ]
+ }
+ createUser(token, realmConfig, systemUser)
# Grants a role to a user
-def addUserRole(user, role, options):
- url = options['url'] + '/' + user['id'] + '/role-mappings/realm'
- try:
- response = requests.post(url, verify=False, json=role, headers=options['headers'])
- except requests.exceptions.Timeout:
- sys.exit('HTTP request failed, please check you internet connection.')
- except requests.exceptions.TooManyRedirects:
- sys.exit('HTTP request failed, please check your proxy settings.')
- except requests.exceptions.RequestException as e:
- # catastrophic error. bail.
- raise SystemExit(e)
-
- if response.status_code >= 200 and response.status_code < 300:
- print('User role', user['username'], role[0]['name'], 'created!')
- else:
- print('Creation of user role', user['username'], role[0]['name'], 'failed!\n', response.text)
+
+
+def addUserRole(user: dict, role: dict, options: dict):
+ url = options['url'] + '/' + user['id'] + '/role-mappings/realm'
+ try:
+ response = requests.post(url, verify=False, json=[
+ {'id': role['id'], 'name':role['name']}], headers=options['headers'])
+ except requests.exceptions.Timeout:
+ sys.exit('HTTP request failed, please check you internet connection.')
+ except requests.exceptions.TooManyRedirects:
+ sys.exit('HTTP request failed, please check your proxy settings.')
+ except requests.exceptions.RequestException as e:
+ # catastrophic error. bail.
+ raise SystemExit(e)
+
+ if response.status_code >= 200 and response.status_code < 300:
+ print('User role', user['username'], role['name'], 'created!')
+ else:
+ print('Creation of user role',
+ user['username'], role['name'], 'failed!\n', response.text)
# searches for the role of a given user
-def findRole(user, authConfig):
- roleName='administration'
- for grant in authConfig['grants']:
- if grant['username'] == user:
- roleName= grant['role']
- role=authConfig['roles'][roleName]
- return role
+
+
+def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict:
+ roleName = 'administration'
+ for grant in authConfig['grants']:
+ if grant['username'] == username:
+ roleName = grant['role']
+ for role in realmConfig['roles']['realm']:
+ if role['name'] == roleName:
+ return role
+ return None
# adds roles to users
-def addUserRoles(token, realmId, authConfig):
- url = base + '/auth/admin/realms/' + realmId + '/users'
- auth = 'bearer ' + token
- headers = {
- 'content-type': 'application/json',
- 'accept': 'application/json',
- 'authorization': auth
- }
- try:
- response = requests.get(url, verify=False, headers=headers)
- except requests.exceptions.Timeout:
- sys.exit('HTTP request failed, please check you internet connection.')
- except requests.exceptions.TooManyRedirects:
- sys.exit('HTTP request failed, please check your proxy settings.')
- except requests.exceptions.RequestException as e:
- # catastrophic error. bail.
- raise SystemExit(e)
-
- if response.status_code >= 200 and response.status_code < 300:
- users = response.json()
- options = {
- "url": url,
- "auth": auth,
- "headers": headers
+
+
+def addUserRoles(token, realmConfig, authConfig):
+ realmId = realmConfig['id']
+ url = base + '/admin/realms/' + realmId + '/users'
+ auth = 'bearer ' + token
+ headers = {
+ 'content-type': 'application/json',
+ 'accept': 'application/json',
+ 'authorization': auth
}
- for user in users:
- role=findRole(user['username'], authConfig)
- addUserRole(user, role, options)
- else:
- sys.exit('Getting users failed.')
+ try:
+ response = requests.get(url, verify=False, headers=headers)
+ except requests.exceptions.Timeout:
+ sys.exit('HTTP request failed, please check you internet connection.')
+ except requests.exceptions.TooManyRedirects:
+ sys.exit('HTTP request failed, please check your proxy settings.')
+ except requests.exceptions.RequestException as e:
+ # catastrophic error. bail.
+ raise SystemExit(e)
+
+ if response.status_code >= 200 and response.status_code < 300:
+ users = response.json()
+ options = {
+ "url": url,
+ "auth": auth,
+ "headers": headers
+ }
+ for user in users:
+ role = findRole(user['username'], authConfig, realmConfig)
+ addUserRole(user, role, options)
+ else:
+ sys.exit('Getting users failed.')
# main
+
+
+(realmFile, authFile, readyTimeout) = load_arguments(sys.argv)
+username = get_environment_variable('ADMIN_USERNAME')
+password = get_environment_variable('ADMIN_PASSWORD')
+base = getBaseUrl()
+isReady(readyTimeout)
token = getToken()
if token:
- with open(realmFile) as file:
- realm = json.load(file)
- if not checkRealmExists(token, realm['id']):
- createRealm(token, realm)
-
- with open(authFile) as authConfig:
- auth = json.load(authConfig)
- createUsers(token, realm['id'], auth);
- addUserRoles(token, realm['id'], auth)
+ with open(realmFile) as file:
+ realmConfig = json.load(file)
+ if not checkRealmExists(token, realmConfig['id']):
+ createRealm(token, realmConfig)
+
+ with open(authFile) as authConfig:
+ authConfig = json.load(authConfig)
+ createUsers(token, realmConfig, authConfig)
+ addUserRoles(token, realmConfig, authConfig)
+ exit(0)
+exit(1)