X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=solution%2Fintegration%2Fsmo%2Fcommon%2Fidentity%2Fconfig.py;fp=solution%2Fintegration%2Fsmo%2Fcommon%2Fidentity%2Fconfig.py;h=649026c109148cca5c1dadb7405f80ae1ddf3542;hb=a93cb37412d81b4034fd6005d009d6bac412a6d9;hp=0000000000000000000000000000000000000000;hpb=53d47d4c8f037cb93aee17953879c834c648700f;p=oam.git diff --git a/solution/integration/smo/common/identity/config.py b/solution/integration/smo/common/identity/config.py new file mode 100644 index 0000000..649026c --- /dev/null +++ b/solution/integration/smo/common/identity/config.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python +################################################################################ +# Copyright 2021 highstreet technologies GmbH +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# importing the sys, json, requests library +import os +import sys +import json +import requests +import getpass + +# global configurations +# TODO: read from ../.env +base = 'http://localhost:8081' +username = 'admin' +password = 'Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U' +realmFile = os.path.dirname(os.path.abspath(__file__)) + '/o-ran-sc-realm.json' +authFile = os.path.dirname(os.path.abspath(__file__)) + '/authentication.json' + +# Request a token for futher communication +def getToken(): + url = base + '/auth/realms/master/protocol/openid-connect/token' + headers = { + 'content-type': 'application/x-www-form-urlencoded', + 'accept': 'application/json' + } + body = { + 'client_id':'admin-cli', + 'grant_type': 'password', + 'username': username, + 'password': password + } + try: + response = requests.post(url, verify=False, auth=(username, password), data=body, headers=headers) + except requests.exceptions.Timeout: + sys.exit('HTTP request failed, please check you internet connection.') + except requests.exceptions.TooManyRedirects: + sys.exit('HTTP request failed, please check your proxy settings.') + except requests.exceptions.RequestException as e: + # catastrophic error. bail. + raise SystemExit(e) + + if response.status_code >= 200 and response.status_code < 300: + print('Got tocken!') + return response.json()['access_token'] + else: + sys.exit('Getting token failed.') + +# create the default realm from file +def createRealm(token, realm): + url = base + '/auth/admin/realms' + auth = 'bearer ' + token + headers = { + 'content-type': 'application/json', + 'accept': 'application/json', + 'authorization': auth + } + try: + response = requests.post(url, verify=False, json=realm, headers=headers) + except requests.exceptions.Timeout: + sys.exit('HTTP request failed, please check you internet connection.') + except requests.exceptions.TooManyRedirects: + sys.exit('HTTP request failed, please check your proxy settings.') + except requests.exceptions.RequestException as e: + # catastrophic error. bail. + raise SystemExit(e) + + return response.status_code >= 200 and response.status_code < 300 + +# Check if default realm exists +def checkRealmExists(token, realmId): + url = base + '/auth/admin/realms/' + realmId + auth = 'bearer ' + token + headers = { + 'accept': 'application/json', + 'authorization': auth + } + try: + response = requests.get(url, verify=False, headers=headers) + except requests.exceptions.Timeout: + sys.exit('HTTP request failed, please check you internet connection.') + except requests.exceptions.TooManyRedirects: + sys.exit('HTTP request failed, please check your proxy settings.') + except requests.exceptions.RequestException as e: + # catastrophic error. bail. + raise SystemExit(e) + + if response.status_code >= 200 and response.status_code < 300: + return realmId == response.json()['id'] + else: + # sys.exit('Getting realm failed.') + return False + +# create a user in default realm +def createUser(token, realmId, user): + url = base + '/auth/admin/realms/' + realmId + '/users' + auth = 'bearer ' + token + headers = { + 'accept': 'application/json', + 'authorization': auth + } + try: + response = requests.post(url, verify=False, json=user, headers=headers) + except requests.exceptions.Timeout: + sys.exit('HTTP request failed, please check you internet connection.') + except requests.exceptions.TooManyRedirects: + sys.exit('HTTP request failed, please check your proxy settings.') + except requests.exceptions.RequestException as e: + # catastrophic error. bail. + raise SystemExit(e) + + if response.status_code >= 200 and response.status_code < 300: + print('User', user['username'], 'created!') + else: + print('User creation', user['username'], 'failed!\n', response.text) + +# creates User accounts in realm based a file +def createUsers(token, realm, authConfig): + for user in authConfig['users']: + createUser(token, realm, user) + + # create a user based on system user + systemUser = { + "firstName": getpass.getuser(), + "lastName": "", + "email": getpass.getuser() + "@sdnr.onap.org", + "enabled": "true", + "username": getpass.getuser(), + "credentials": [ + { + "type": "password", + "value": password, + "temporary": False + } + ] + } + createUser(token, realm, systemUser) + +# Grants a role to a user +def addUserRole(user, role, options): + url = options['url'] + '/' + user['id'] + '/role-mappings/realm' + try: + response = requests.post(url, verify=False, json=role, headers=options['headers']) + except requests.exceptions.Timeout: + sys.exit('HTTP request failed, please check you internet connection.') + except requests.exceptions.TooManyRedirects: + sys.exit('HTTP request failed, please check your proxy settings.') + except requests.exceptions.RequestException as e: + # catastrophic error. bail. + raise SystemExit(e) + + if response.status_code >= 200 and response.status_code < 300: + print('User role', user['username'], role[0]['name'], 'created!') + else: + print('Creation of user role', user['username'], role[0]['name'], 'failed!\n', response.text) + +# searches for the role of a given user +def findRole(user, authConfig): + roleName='administration' + for grant in authConfig['grants']: + if grant['username'] == user: + roleName= grant['role'] + role=authConfig['roles'][roleName] + return role + +# adds roles to users +def addUserRoles(token, realmId, authConfig): + url = base + '/auth/admin/realms/' + realmId + '/users' + auth = 'bearer ' + token + headers = { + 'content-type': 'application/json', + 'accept': 'application/json', + 'authorization': auth + } + try: + response = requests.get(url, verify=False, headers=headers) + except requests.exceptions.Timeout: + sys.exit('HTTP request failed, please check you internet connection.') + except requests.exceptions.TooManyRedirects: + sys.exit('HTTP request failed, please check your proxy settings.') + except requests.exceptions.RequestException as e: + # catastrophic error. bail. + raise SystemExit(e) + + if response.status_code >= 200 and response.status_code < 300: + users = response.json() + options = { + "url": url, + "auth": auth, + "headers": headers + } + for user in users: + role=findRole(user['username'], authConfig) + addUserRole(user, role, options) + else: + sys.exit('Getting users failed.') + +# main +token = getToken() +if token: + with open(realmFile) as file: + realm = json.load(file) + if not checkRealmExists(token, realm['id']): + createRealm(token, realm) + + with open(authFile) as authConfig: + auth = json.load(authConfig) + createUsers(token, realm['id'], auth); + addUserRoles(token, realm['id'], auth)