2 #############################################################################
3 # Copyright 2023 highstreet technologies GmbH
5 # Licensed under the Apache License, Version 2.0 (the 'License');
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an 'AS IS' BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 # importing the sys, json, requests library
28 from jproperties import Properties
29 from typing import List
30 warnings.filterwarnings('ignore', message='Unverified HTTPS request')
31 # global configurations
34 def get_environment_variable(name):
35 configs = Properties()
36 path = pathlib.Path(os.path.dirname(os.path.abspath(__file__)))
37 env_file = str(path.parent.absolute()) + '/.env'
38 with open(env_file, "rb") as read_prop:
39 configs.load(read_prop)
40 value = configs.get(name).data
42 regex = r"\$\{([^\}]+)\}"
43 matches = re.finditer(regex, value)
45 match = next(matches, None)
48 inner = get_environment_variable(match.group(1))
49 value = value.replace("${" + match.group(1) + "}", inner )
53 def load_arguments(args: List[str]) -> tuple:
54 realm_file = os.path.dirname(os.path.abspath(
55 __file__)) + '/o-ran-sc-realm.json'
56 auth_file = os.path.dirname(os.path.abspath(
57 __file__)) + '/authentication.json'
62 if arg == '--auth' and len(args) > 0:
63 auth_file = args.pop(0)
64 print('overwriting auth file: {}'.format(auth_file))
65 elif arg == '--realm' and len(args) > 0:
66 realm_file = args.pop(0)
67 print('overwriting realm file: {}'.format(realm_file))
68 elif arg == '--timeout' and len(args) > 0:
69 ready_timeout = int(args.pop(0))
70 print('waiting for ready {} seconds'.format(ready_timeout))
72 return (realm_file, auth_file, ready_timeout)
75 def isReady(timeoutSeconds=180):
78 while timeoutSeconds > 0:
80 response = requests.get(url, verify=False, headers={})
83 if response is not None and response.status_code == 200:
91 return get_environment_variable("IDENTITY_PROVIDER_URL")
93 # Request a token for further communication
97 url = base + '/realms/master/protocol/openid-connect/token'
99 'content-type': 'application/x-www-form-urlencoded',
100 'accept': 'application/json'
103 'client_id': 'admin-cli',
104 'grant_type': 'password',
105 'username': username,
109 response = requests.post(url, verify=False, auth=(
110 username, password), data=body, headers=headers)
111 except requests.exceptions.Timeout:
112 sys.exit('HTTP request failed, please check you internet connection.')
113 except requests.exceptions.TooManyRedirects:
114 sys.exit('HTTP request failed, please check your proxy settings.')
115 except requests.exceptions.RequestException as e:
116 # catastrophic error. bail.
119 if response.status_code >= 200 and response.status_code < 300:
121 return response.json()['access_token']
123 sys.exit('Getting token failed.')
125 # create the default realm from file
128 def createRealm(token, realm):
129 url = base + '/admin/realms'
130 auth = 'bearer ' + token
132 'content-type': 'application/json',
133 'accept': 'application/json',
134 'authorization': auth
137 response = requests.post(
138 url, verify=False, json=realm, headers=headers)
139 except requests.exceptions.Timeout:
140 sys.exit('HTTP request failed, please check you internet connection.')
141 except requests.exceptions.TooManyRedirects:
142 sys.exit('HTTP request failed, please check your proxy settings.')
143 except requests.exceptions.RequestException as e:
144 # catastrophic error. bail.
147 return response.status_code >= 200 and response.status_code < 300
149 # Check if default realm exists
152 def checkRealmExists(token, realmId):
153 url = base + '/admin/realms/' + realmId
154 auth = 'bearer ' + token
156 'accept': 'application/json',
157 'authorization': auth
160 response = requests.get(url, verify=False, headers=headers)
161 except requests.exceptions.Timeout:
162 sys.exit('HTTP request failed, please check you internet connection.')
163 except requests.exceptions.TooManyRedirects:
164 sys.exit('HTTP request failed, please check your proxy settings.')
165 except requests.exceptions.RequestException as e:
166 # catastrophic error. bail.
169 if response.status_code >= 200 and response.status_code < 300:
170 return realmId == response.json()['id']
172 # sys.exit('Getting realm failed.')
175 # create a user in default realm
178 def createUser(token, realmConfig, user):
179 realmId = realmConfig['id']
180 url = base + '/admin/realms/' + realmId + '/users'
181 auth = 'bearer ' + token
183 'accept': 'application/json',
184 'authorization': auth
187 response = requests.post(url, verify=False, json=user, headers=headers)
188 except requests.exceptions.Timeout:
189 sys.exit('HTTP request failed, please check you internet connection.')
190 except requests.exceptions.TooManyRedirects:
191 sys.exit('HTTP request failed, please check your proxy settings.')
192 except requests.exceptions.RequestException as e:
193 # catastrophic error. bail.
196 if response.status_code >= 200 and response.status_code < 300:
197 print('User', user['username'], 'created!')
199 print('User creation', user['username'], 'failed!\n', response.text)
201 # creates User accounts in realm based a file
204 def createUsers(token, realmConfig, authConfig):
205 for user in authConfig['users']:
206 createUser(token, realmConfig, user)
208 # create a user based on system user
210 "firstName": getpass.getuser(),
212 "email": getpass.getuser() + "@sdnr.onap.org",
214 "username": getpass.getuser(),
226 createUser(token, realmConfig, systemUser)
228 # Grants a role to a user
231 def addUserRole(user: dict, role: dict, options: dict):
232 url = options['url'] + '/' + user['id'] + '/role-mappings/realm'
234 response = requests.post(url, verify=False, json=[
235 {'id': role['id'], 'name':role['name']}],
236 headers=options['headers'])
237 except requests.exceptions.Timeout:
238 sys.exit('HTTP request failed, please check you internet connection.')
239 except requests.exceptions.TooManyRedirects:
240 sys.exit('HTTP request failed, please check your proxy settings.')
241 except requests.exceptions.RequestException as e:
242 # catastrophic error. bail.
245 if response.status_code >= 200 and response.status_code < 300:
246 print('User role', user['username'], role['name'], 'created!')
248 print('Creation of user role',
249 user['username'], role['name'], 'failed!\n', response.text)
251 # searches for the role of a given user
254 def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict:
255 roleName = 'administration'
256 for grant in authConfig['grants']:
257 if grant['username'] == username:
258 roleName = grant['role']
259 for role in realmConfig['roles']['realm']:
260 if role['name'] == roleName:
264 # adds roles to users
267 def addUserRoles(token, realmConfig, authConfig):
268 realmId = realmConfig['id']
269 url = base + '/admin/realms/' + realmId + '/users'
270 auth = 'bearer ' + token
272 'content-type': 'application/json',
273 'accept': 'application/json',
274 'authorization': auth
277 response = requests.get(url, verify=False, headers=headers)
278 except requests.exceptions.Timeout:
279 sys.exit('HTTP request failed, please check you internet connection.')
280 except requests.exceptions.TooManyRedirects:
281 sys.exit('HTTP request failed, please check your proxy settings.')
282 except requests.exceptions.RequestException as e:
283 # catastrophic error. bail.
286 if response.status_code >= 200 and response.status_code < 300:
287 users = response.json()
294 role = findRole(user['username'], authConfig, realmConfig)
295 addUserRole(user, role, options)
297 sys.exit('Getting users failed.')
302 (realmFile, authFile, readyTimeout) = load_arguments(sys.argv)
303 username = get_environment_variable('ADMIN_USERNAME')
304 password = get_environment_variable('ADMIN_PASSWORD')
306 isReady(readyTimeout)
309 with open(realmFile) as file:
310 realmConfig = json.load(file)
311 if not checkRealmExists(token, realmConfig['id']):
312 createRealm(token, realmConfig)
314 with open(authFile) as authConfig:
315 authConfig = json.load(authConfig)
316 createUsers(token, realmConfig, authConfig)
317 addUserRoles(token, realmConfig, authConfig)