Enhance config scripts
[oam.git] / solution / smo / common / identity / config.py
1 #!/usr/bin/env python
2 #############################################################################
3 # Copyright 2023 highstreet technologies GmbH
4 #
5 # Licensed under the Apache License, Version 2.0 (the 'License');
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an 'AS IS' BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 # importing the sys, json, requests library
19 import os
20 import pathlib
21 import sys
22 import json
23 import time
24 import getpass
25 import requests
26 import re
27 import warnings
28 from jproperties import Properties
29 from typing import List
30 warnings.filterwarnings('ignore', message='Unverified HTTPS request')
31 # global configurations
32
33
34 def get_environment_variable(name):
35     configs = Properties()
36     path = pathlib.Path(os.path.dirname(os.path.abspath(__file__)))
37     env_file = str(path.parent.absolute()) + '/.env'
38     with open(env_file, "rb") as read_prop:
39         configs.load(read_prop)
40     value = configs.get(name).data
41
42     regex = r"\$\{([^\}]+)\}"
43     matches = re.finditer(regex, value)
44     while True:
45         match = next(matches, None)
46         if match is None:
47             break
48         inner = get_environment_variable(match.group(1))
49         value = value.replace("${" + match.group(1) + "}", inner )
50     return value
51
52
53 def load_arguments(args: List[str]) -> tuple:
54     realm_file = os.path.dirname(os.path.abspath(
55         __file__)) + '/o-ran-sc-realm.json'
56     auth_file = os.path.dirname(os.path.abspath(
57         __file__)) + '/authentication.json'
58     ready_timeout = 180
59     args.pop(0)
60     while len(args) > 0:
61         arg = args.pop(0)
62         if arg == '--auth' and len(args) > 0:
63             auth_file = args.pop(0)
64             print('overwriting auth file: {}'.format(auth_file))
65         elif arg == '--realm' and len(args) > 0:
66             realm_file = args.pop(0)
67             print('overwriting realm file: {}'.format(realm_file))
68         elif arg == '--timeout' and len(args) > 0:
69             ready_timeout = int(args.pop(0))
70             print('waiting for ready {} seconds'.format(ready_timeout))
71
72     return (realm_file, auth_file, ready_timeout)
73
74
75 def isReady(timeoutSeconds=180):
76     url = getBaseUrl()
77     print(f'url={url}')
78     while timeoutSeconds > 0:
79         try:
80             response = requests.get(url, verify=False, headers={})
81         except:
82             response = None
83         if response is not None and response.status_code == 200:
84             return True
85         time.sleep(1)
86         timeoutSeconds -= 1
87     return False
88
89
90 def getBaseUrl():
91     return get_environment_variable("IDENTITY_PROVIDER_URL")
92
93 # Request a token for further communication
94
95
96 def getToken():
97     url = base + '/realms/master/protocol/openid-connect/token'
98     headers = {
99         'content-type': 'application/x-www-form-urlencoded',
100         'accept': 'application/json'
101     }
102     body = {
103         'client_id': 'admin-cli',
104         'grant_type': 'password',
105         'username': username,
106         'password': password
107     }
108     try:
109         response = requests.post(url, verify=False, auth=(
110             username, password), data=body, headers=headers)
111     except requests.exceptions.Timeout:
112         sys.exit('HTTP request failed, please check you internet connection.')
113     except requests.exceptions.TooManyRedirects:
114         sys.exit('HTTP request failed, please check your proxy settings.')
115     except requests.exceptions.RequestException as e:
116         # catastrophic error. bail.
117         raise SystemExit(e)
118
119     if response.status_code >= 200 and response.status_code < 300:
120         print('Got token!')
121         return response.json()['access_token']
122     else:
123         sys.exit('Getting token failed.')
124
125 # create the default realm from file
126
127
128 def createRealm(token, realm):
129     url = base + '/admin/realms'
130     auth = 'bearer ' + token
131     headers = {
132         'content-type': 'application/json',
133         'accept': 'application/json',
134         'authorization': auth
135     }
136     try:
137         response = requests.post(
138             url, verify=False, json=realm, headers=headers)
139     except requests.exceptions.Timeout:
140         sys.exit('HTTP request failed, please check you internet connection.')
141     except requests.exceptions.TooManyRedirects:
142         sys.exit('HTTP request failed, please check your proxy settings.')
143     except requests.exceptions.RequestException as e:
144         # catastrophic error. bail.
145         raise SystemExit(e)
146
147     return response.status_code >= 200 and response.status_code < 300
148
149 # Check if default realm exists
150
151
152 def checkRealmExists(token, realmId):
153     url = base + '/admin/realms/' + realmId
154     auth = 'bearer ' + token
155     headers = {
156         'accept': 'application/json',
157         'authorization': auth
158     }
159     try:
160         response = requests.get(url, verify=False, headers=headers)
161     except requests.exceptions.Timeout:
162         sys.exit('HTTP request failed, please check you internet connection.')
163     except requests.exceptions.TooManyRedirects:
164         sys.exit('HTTP request failed, please check your proxy settings.')
165     except requests.exceptions.RequestException as e:
166         # catastrophic error. bail.
167         raise SystemExit(e)
168
169     if response.status_code >= 200 and response.status_code < 300:
170         return realmId == response.json()['id']
171     else:
172         # sys.exit('Getting realm failed.')
173         return False
174
175 # create a user in default realm
176
177
178 def createUser(token, realmConfig, user):
179     realmId = realmConfig['id']
180     url = base + '/admin/realms/' + realmId + '/users'
181     auth = 'bearer ' + token
182     headers = {
183         'accept': 'application/json',
184         'authorization': auth
185     }
186     try:
187         response = requests.post(url, verify=False, json=user, headers=headers)
188     except requests.exceptions.Timeout:
189         sys.exit('HTTP request failed, please check you internet connection.')
190     except requests.exceptions.TooManyRedirects:
191         sys.exit('HTTP request failed, please check your proxy settings.')
192     except requests.exceptions.RequestException as e:
193         # catastrophic error. bail.
194         raise SystemExit(e)
195
196     if response.status_code >= 200 and response.status_code < 300:
197         print('User', user['username'], 'created!')
198     else:
199         print('User creation', user['username'], 'failed!\n', response.text)
200
201 # creates User accounts in realm based a file
202
203
204 def createUsers(token, realmConfig, authConfig):
205     for user in authConfig['users']:
206         createUser(token, realmConfig, user)
207
208     # create a user based on system user
209     systemUser = {
210         "firstName": getpass.getuser(),
211         "lastName": "",
212         "email": getpass.getuser() + "@sdnr.onap.org",
213         "enabled": "true",
214         "username": getpass.getuser(),
215         "credentials": [
216             {
217                 "type": "password",
218                 "value": password,
219                 "temporary": True
220             }
221         ],
222         "requiredActions": [
223             "UPDATE_PASSWORD"
224         ]
225     }
226     createUser(token, realmConfig, systemUser)
227
228 # Grants a role to a user
229
230
231 def addUserRole(user: dict, role: dict, options: dict):
232     url = options['url'] + '/' + user['id'] + '/role-mappings/realm'
233     try:
234         response = requests.post(url, verify=False, json=[
235                                  {'id': role['id'], 'name':role['name']}],
236                                  headers=options['headers'])
237     except requests.exceptions.Timeout:
238         sys.exit('HTTP request failed, please check you internet connection.')
239     except requests.exceptions.TooManyRedirects:
240         sys.exit('HTTP request failed, please check your proxy settings.')
241     except requests.exceptions.RequestException as e:
242         # catastrophic error. bail.
243         raise SystemExit(e)
244
245     if response.status_code >= 200 and response.status_code < 300:
246         print('User role', user['username'], role['name'], 'created!')
247     else:
248         print('Creation of user role',
249               user['username'], role['name'], 'failed!\n', response.text)
250
251 # searches for the role of a given user
252
253
254 def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict:
255     roleName = 'administration'
256     for grant in authConfig['grants']:
257         if grant['username'] == username:
258             roleName = grant['role']
259     for role in realmConfig['roles']['realm']:
260         if role['name'] == roleName:
261             return role
262     return None
263
264 # adds roles to users
265
266
267 def addUserRoles(token, realmConfig, authConfig):
268     realmId = realmConfig['id']
269     url = base + '/admin/realms/' + realmId + '/users'
270     auth = 'bearer ' + token
271     headers = {
272         'content-type': 'application/json',
273         'accept': 'application/json',
274         'authorization': auth
275     }
276     try:
277         response = requests.get(url, verify=False, headers=headers)
278     except requests.exceptions.Timeout:
279         sys.exit('HTTP request failed, please check you internet connection.')
280     except requests.exceptions.TooManyRedirects:
281         sys.exit('HTTP request failed, please check your proxy settings.')
282     except requests.exceptions.RequestException as e:
283         # catastrophic error. bail.
284         raise SystemExit(e)
285
286     if response.status_code >= 200 and response.status_code < 300:
287         users = response.json()
288         options = {
289             "url": url,
290             "auth": auth,
291             "headers": headers
292         }
293         for user in users:
294             role = findRole(user['username'], authConfig, realmConfig)
295             addUserRole(user, role, options)
296     else:
297         sys.exit('Getting users failed.')
298
299 # main
300
301
302 (realmFile, authFile, readyTimeout) = load_arguments(sys.argv)
303 username = get_environment_variable('ADMIN_USERNAME')
304 password = get_environment_variable('ADMIN_PASSWORD')
305 base = getBaseUrl()
306 isReady(readyTimeout)
307 token = getToken()
308 if token:
309     with open(realmFile) as file:
310         realmConfig = json.load(file)
311         if not checkRealmExists(token, realmConfig['id']):
312             createRealm(token, realmConfig)
313
314         with open(authFile) as authConfig:
315             authConfig = json.load(authConfig)
316             createUsers(token, realmConfig, authConfig)
317         addUserRoles(token, realmConfig, authConfig)
318     exit(0)
319 exit(1)