Simplify identity configuration 25/9025/1
authordemx8as6 <martin.skorupski@highstreet-technologies.com>
Thu, 15 Sep 2022 12:19:17 +0000 (14:19 +0200)
committerdemx8as6 <martin.skorupski@highstreet-technologies.com>
Thu, 15 Sep 2022 12:19:25 +0000 (14:19 +0200)
- read user roles from keycloak when creating user with config.py

IssueID: OAM-299
Change-Id: I8a8b4a47dc629e53bcc5c219109d8ad3cbe9e489
Signed-off-by: demx8as6 <martin.skorupski@highstreet-technologies.com>
solution/integration/smo/common/identity/config.py

index 9c1ec28..1ff53ac 100644 (file)
 import os
 import sys
 import json
-import requests
+import time
 import getpass
-
+import requests
+import warnings
+from jproperties import Properties
+from typing import List
+warnings.filterwarnings('ignore', message='Unverified HTTPS request')
 # global configurations
-# TODO: read from ../.env
-base = 'https://identity:8463'
-username = 'admin'
-password = 'Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U'
-realmFile = os.path.dirname(os.path.abspath(__file__)) + '/o-ran-sc-realm.json'
-authFile = os.path.dirname(os.path.abspath(__file__)) + '/authentication.json'
-
-# Request a token for futher communication
+
+
+def get_environment_variable(name):
+    configs = Properties()
+    env_file = os.path.dirname(os.path.abspath(__file__)) + '/' + '../' + '.env'
+    with open(env_file, "rb") as read_prop:
+        configs.load(read_prop)
+    return configs.get(name).data
+
+
+def load_arguments(args: List[str]) -> tuple:
+    realm_file = os.path.dirname(os.path.abspath(
+        __file__)) + '/o-ran-sc-realm.json'
+    auth_file = os.path.dirname(os.path.abspath(
+        __file__)) + '/authentication.json'
+    ready_timeout = 180
+    args.pop(0)
+    while len(args) > 0:
+        arg = args.pop(0)
+        if arg == '--auth' and len(args) > 0:
+            auth_file = args.pop(0)
+            print('overwriting auth file: {}'.format(auth_file))
+        elif arg == '--realm' and len(args) > 0:
+            realm_file = args.pop(0)
+            print('overwriting realm file: {}'.format(realm_file))
+        elif arg == '--timeout' and len(args) > 0:
+            ready_timeout = int(args.pop(0))
+            print('waiting for ready {} seconds'.format(ready_timeout))
+
+    return (realm_file, auth_file, ready_timeout)
+
+
+def isReady(timeoutSeconds=180):
+    url = getBaseUrl()
+    while timeoutSeconds > 0:
+        try:
+            response = requests.get(url, verify=False, headers={})
+        except:
+            pass
+        if response is not None and response.status_code == 200:
+            return True
+        time.sleep(1)
+        timeoutSeconds -= 1
+    return False
+
+
+def getBaseUrl():
+    return get_environment_variable("IDENTITY_PROVIDER_URL")
+
+# Request a token for further communication
 def getToken():
-    url = base + '/auth/realms/master/protocol/openid-connect/token'
+    url = base + '/realms/master/protocol/openid-connect/token'
     headers = {
         'content-type': 'application/x-www-form-urlencoded',
         'accept': 'application/json'
     }
     body = {
-      'client_id':'admin-cli',
-      'grant_type': 'password',
-      'username': username,
-      'password': password
+        'client_id': 'admin-cli',
+        'grant_type': 'password',
+        'username': username,
+        'password': password
     }
     try:
-      response = requests.post(url, verify=False, auth=(username, password), data=body, headers=headers)
+        response = requests.post(url, verify=False, auth=(
+            username, password), data=body, headers=headers)
     except requests.exceptions.Timeout:
-      sys.exit('HTTP request failed, please check you internet connection.')
+        sys.exit('HTTP request failed, please check you internet connection.')
     except requests.exceptions.TooManyRedirects:
-      sys.exit('HTTP request failed, please check your proxy settings.')
+        sys.exit('HTTP request failed, please check your proxy settings.')
     except requests.exceptions.RequestException as e:
-      # catastrophic error. bail.
-      raise SystemExit(e)
+        # catastrophic error. bail.
+        raise SystemExit(e)
 
     if response.status_code >= 200 and response.status_code < 300:
-      print('Got token!')
-      return response.json()['access_token']
+        print('Got token!')
+        return response.json()['access_token']
     else:
-      sys.exit('Getting token failed.')
+        sys.exit('Getting token failed.')
 
 # create the default realm from file
+
+
 def createRealm(token, realm):
-  url = base + '/auth/admin/realms'
-  auth = 'bearer ' + token
-  headers = {
-      'content-type': 'application/json',
-      'accept': 'application/json',
-      'authorization': auth
-  }
-  try:
-    response = requests.post(url, verify=False, json=realm, headers=headers)
-  except requests.exceptions.Timeout:
-    sys.exit('HTTP request failed, please check you internet connection.')
-  except requests.exceptions.TooManyRedirects:
-    sys.exit('HTTP request failed, please check your proxy settings.')
-  except requests.exceptions.RequestException as e:
-    # catastrophic error. bail.
-    raise SystemExit(e)
-
-  return response.status_code >= 200 and response.status_code < 300
+    url = base + '/admin/realms'
+    auth = 'bearer ' + token
+    headers = {
+        'content-type': 'application/json',
+        'accept': 'application/json',
+        'authorization': auth
+    }
+    try:
+        response = requests.post(
+            url, verify=False, json=realm, headers=headers)
+    except requests.exceptions.Timeout:
+        sys.exit('HTTP request failed, please check you internet connection.')
+    except requests.exceptions.TooManyRedirects:
+        sys.exit('HTTP request failed, please check your proxy settings.')
+    except requests.exceptions.RequestException as e:
+        # catastrophic error. bail.
+        raise SystemExit(e)
+
+    return response.status_code >= 200 and response.status_code < 300
 
 # Check if default realm exists
+
+
 def checkRealmExists(token, realmId):
-  url = base + '/auth/admin/realms/' + realmId
-  auth = 'bearer ' + token
-  headers = {
-      'accept': 'application/json',
-      'authorization': auth
-  }
-  try:
-      response = requests.get(url, verify=False, headers=headers)
-  except requests.exceptions.Timeout:
-      sys.exit('HTTP request failed, please check you internet connection.')
-  except requests.exceptions.TooManyRedirects:
-      sys.exit('HTTP request failed, please check your proxy settings.')
-  except requests.exceptions.RequestException as e:
-      # catastrophic error. bail.
-      raise SystemExit(e)
-
-  if response.status_code >= 200 and response.status_code < 300:
-    return realmId == response.json()['id']
-  else:
-    # sys.exit('Getting realm failed.')
-    return False
+    url = base + '/admin/realms/' + realmId
+    auth = 'bearer ' + token
+    headers = {
+        'accept': 'application/json',
+        'authorization': auth
+    }
+    try:
+        response = requests.get(url, verify=False, headers=headers)
+    except requests.exceptions.Timeout:
+        sys.exit('HTTP request failed, please check you internet connection.')
+    except requests.exceptions.TooManyRedirects:
+        sys.exit('HTTP request failed, please check your proxy settings.')
+    except requests.exceptions.RequestException as e:
+        # catastrophic error. bail.
+        raise SystemExit(e)
+
+    if response.status_code >= 200 and response.status_code < 300:
+        return realmId == response.json()['id']
+    else:
+        # sys.exit('Getting realm failed.')
+        return False
 
 # create a user in default realm
-def createUser(token, realmId, user):
-  url = base + '/auth/admin/realms/' + realmId + '/users'
-  auth = 'bearer ' + token
-  headers = {
-      'accept': 'application/json',
-      'authorization': auth
-  }
-  try:
-    response = requests.post(url, verify=False, json=user, headers=headers)
-  except requests.exceptions.Timeout:
-    sys.exit('HTTP request failed, please check you internet connection.')
-  except requests.exceptions.TooManyRedirects:
-    sys.exit('HTTP request failed, please check your proxy settings.')
-  except requests.exceptions.RequestException as e:
-    # catastrophic error. bail.
-    raise SystemExit(e)
-
-  if response.status_code >= 200 and response.status_code < 300:
-    print('User', user['username'], 'created!')
-  else:
-    print('User creation', user['username'], 'failed!\n', response.text)
+
+
+def createUser(token, realmConfig, user):
+    realmId = realmConfig['id']
+    url = base + '/admin/realms/' + realmId + '/users'
+    auth = 'bearer ' + token
+    headers = {
+        'accept': 'application/json',
+        'authorization': auth
+    }
+    try:
+        response = requests.post(url, verify=False, json=user, headers=headers)
+    except requests.exceptions.Timeout:
+        sys.exit('HTTP request failed, please check you internet connection.')
+    except requests.exceptions.TooManyRedirects:
+        sys.exit('HTTP request failed, please check your proxy settings.')
+    except requests.exceptions.RequestException as e:
+        # catastrophic error. bail.
+        raise SystemExit(e)
+
+    if response.status_code >= 200 and response.status_code < 300:
+        print('User', user['username'], 'created!')
+    else:
+        print('User creation', user['username'], 'failed!\n', response.text)
 
 # creates User accounts in realm based a file
-def createUsers(token, realm, authConfig):
-  for user in authConfig['users']:
-    createUser(token, realm, user)
-  
-  # create a user based on system user
-  systemUser = {
-    "firstName": getpass.getuser(),
-    "lastName": "",
-    "email": getpass.getuser() + "@sdnr.onap.org",
-    "enabled": "true",
-    "username": getpass.getuser(),
-    "credentials": [
-      {
-        "type": "password",
-        "value": password,
-        "temporary": False
-      }
-    ]
-  }
-  createUser(token, realm, systemUser)
+
+
+def createUsers(token, realmConfig, authConfig):
+    for user in authConfig['users']:
+        createUser(token, realmConfig, user)
+
+    # create a user based on system user
+    systemUser = {
+        "firstName": getpass.getuser(),
+        "lastName": "",
+        "email": getpass.getuser() + "@sdnr.onap.org",
+        "enabled": "true",
+        "username": getpass.getuser(),
+        "credentials": [
+            {
+                "type": "password",
+                "value": password,
+                "temporary": False
+            }
+        ]
+    }
+    createUser(token, realmConfig, systemUser)
 
 # Grants a role to a user
-def addUserRole(user, role, options):
-  url = options['url'] + '/' + user['id'] + '/role-mappings/realm'
-  try:
-    response = requests.post(url, verify=False, json=role, headers=options['headers'])
-  except requests.exceptions.Timeout:
-    sys.exit('HTTP request failed, please check you internet connection.')
-  except requests.exceptions.TooManyRedirects:
-    sys.exit('HTTP request failed, please check your proxy settings.')
-  except requests.exceptions.RequestException as e:
-    # catastrophic error. bail.
-    raise SystemExit(e)
-
-  if response.status_code >= 200 and response.status_code < 300:
-    print('User role', user['username'], role[0]['name'], 'created!')
-  else:
-    print('Creation of user role', user['username'], role[0]['name'], 'failed!\n', response.text)
+
+
+def addUserRole(user: dict, role: dict, options: dict):
+    url = options['url'] + '/' + user['id'] + '/role-mappings/realm'
+    try:
+        response = requests.post(url, verify=False, json=[
+                                 {'id': role['id'], 'name':role['name']}], headers=options['headers'])
+    except requests.exceptions.Timeout:
+        sys.exit('HTTP request failed, please check you internet connection.')
+    except requests.exceptions.TooManyRedirects:
+        sys.exit('HTTP request failed, please check your proxy settings.')
+    except requests.exceptions.RequestException as e:
+        # catastrophic error. bail.
+        raise SystemExit(e)
+
+    if response.status_code >= 200 and response.status_code < 300:
+        print('User role', user['username'], role['name'], 'created!')
+    else:
+        print('Creation of user role',
+              user['username'], role['name'], 'failed!\n', response.text)
 
 # searches for the role of a given user
-def findRole(user, authConfig):
-  roleName='administration'
-  for grant in authConfig['grants']:
-    if grant['username'] == user:
-      roleName= grant['role']
-  role=authConfig['roles'][roleName]
-  return role
+
+
+def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict:
+    roleName = 'administration'
+    for grant in authConfig['grants']:
+        if grant['username'] == username:
+            roleName = grant['role']
+    for role in realmConfig['roles']['realm']:
+        if role['name'] == roleName:
+            return role
+    return None
 
 # adds roles to users
-def addUserRoles(token, realmId, authConfig):
-  url = base + '/auth/admin/realms/' + realmId + '/users'
-  auth = 'bearer ' + token
-  headers = {
-      'content-type': 'application/json',
-      'accept': 'application/json',
-      'authorization': auth
-  }
-  try:
-      response = requests.get(url, verify=False, headers=headers)
-  except requests.exceptions.Timeout:
-      sys.exit('HTTP request failed, please check you internet connection.')
-  except requests.exceptions.TooManyRedirects:
-      sys.exit('HTTP request failed, please check your proxy settings.')
-  except requests.exceptions.RequestException as e:
-      # catastrophic error. bail.
-      raise SystemExit(e)
-
-  if response.status_code >= 200 and response.status_code < 300:
-    users = response.json()
-    options = {
-      "url": url,
-      "auth": auth,
-      "headers": headers
+
+
+def addUserRoles(token, realmConfig, authConfig):
+    realmId = realmConfig['id']
+    url = base + '/admin/realms/' + realmId + '/users'
+    auth = 'bearer ' + token
+    headers = {
+        'content-type': 'application/json',
+        'accept': 'application/json',
+        'authorization': auth
     }
-    for user in users:
-      role=findRole(user['username'], authConfig)
-      addUserRole(user, role, options)
-  else:
-    sys.exit('Getting users failed.')
+    try:
+        response = requests.get(url, verify=False, headers=headers)
+    except requests.exceptions.Timeout:
+        sys.exit('HTTP request failed, please check you internet connection.')
+    except requests.exceptions.TooManyRedirects:
+        sys.exit('HTTP request failed, please check your proxy settings.')
+    except requests.exceptions.RequestException as e:
+        # catastrophic error. bail.
+        raise SystemExit(e)
+
+    if response.status_code >= 200 and response.status_code < 300:
+        users = response.json()
+        options = {
+            "url": url,
+            "auth": auth,
+            "headers": headers
+        }
+        for user in users:
+            role = findRole(user['username'], authConfig, realmConfig)
+            addUserRole(user, role, options)
+    else:
+        sys.exit('Getting users failed.')
 
 # main
+
+
+(realmFile, authFile, readyTimeout) = load_arguments(sys.argv)
+username = get_environment_variable('ADMIN_USERNAME')
+password = get_environment_variable('ADMIN_PASSWORD')
+base = getBaseUrl()
+isReady(readyTimeout)
 token = getToken()
 if token:
-  with open(realmFile) as file:
-    realm = json.load(file)
-    if not checkRealmExists(token, realm['id']):
-      createRealm(token, realm)
-
-    with open(authFile) as authConfig:
-      auth = json.load(authConfig)
-      createUsers(token, realm['id'], auth);
-    addUserRoles(token, realm['id'], auth)
+    with open(realmFile) as file:
+        realmConfig = json.load(file)
+        if not checkRealmExists(token, realmConfig['id']):
+            createRealm(token, realmConfig)
+
+        with open(authFile) as authConfig:
+            authConfig = json.load(authConfig)
+            createUsers(token, realmConfig, authConfig)
+        addUserRoles(token, realmConfig, authConfig)
+    exit(0)
+exit(1)