Correct getBaseUrl method
[oam.git] / solution / smo / common / identity / config.py
1 #!/usr/bin/env python
2 #############################################################################
3 # Copyright 2023 highstreet technologies GmbH
4 #
5 # Licensed under the Apache License, Version 2.0 (the 'License');
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an 'AS IS' BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 # importing the sys, json, requests library
19 import os
20 import pathlib
21 import sys
22 import json
23 import time
24 import getpass
25 import requests
26 import warnings
27 from jproperties import Properties
28 from typing import List
29 warnings.filterwarnings('ignore', message='Unverified HTTPS request')
30 # global configurations
31
32
33 def get_environment_variable(name):
34     configs = Properties()
35     path = pathlib.Path(os.path.dirname(os.path.abspath(__file__)))
36     env_file = str(path.parent.absolute()) + '/.env'
37     with open(env_file, "rb") as read_prop:
38         configs.load(read_prop)
39     return configs.get(name).data
40
41
42 def load_arguments(args: List[str]) -> tuple:
43     realm_file = os.path.dirname(os.path.abspath(
44         __file__)) + '/o-ran-sc-realm.json'
45     auth_file = os.path.dirname(os.path.abspath(
46         __file__)) + '/authentication.json'
47     ready_timeout = 180
48     args.pop(0)
49     while len(args) > 0:
50         arg = args.pop(0)
51         if arg == '--auth' and len(args) > 0:
52             auth_file = args.pop(0)
53             print('overwriting auth file: {}'.format(auth_file))
54         elif arg == '--realm' and len(args) > 0:
55             realm_file = args.pop(0)
56             print('overwriting realm file: {}'.format(realm_file))
57         elif arg == '--timeout' and len(args) > 0:
58             ready_timeout = int(args.pop(0))
59             print('waiting for ready {} seconds'.format(ready_timeout))
60
61     return (realm_file, auth_file, ready_timeout)
62
63
64 def isReady(timeoutSeconds=180):
65     url = getBaseUrl()
66     while timeoutSeconds > 0:
67         try:
68             response = requests.get(url, verify=False, headers={})
69         except:
70             response = None
71         if response is not None and response.status_code == 200:
72             return True
73         time.sleep(1)
74         timeoutSeconds -= 1
75     return False
76
77
78 def getBaseUrl():
79     return get_environment_variable("IDENTITY_PROVIDER_URL")
80
81 # Request a token for further communication
82
83
84 def getToken():
85     url = base + '/realms/master/protocol/openid-connect/token'
86     headers = {
87         'content-type': 'application/x-www-form-urlencoded',
88         'accept': 'application/json'
89     }
90     body = {
91         'client_id': 'admin-cli',
92         'grant_type': 'password',
93         'username': username,
94         'password': password
95     }
96     try:
97         response = requests.post(url, verify=False, auth=(
98             username, password), data=body, headers=headers)
99     except requests.exceptions.Timeout:
100         sys.exit('HTTP request failed, please check you internet connection.')
101     except requests.exceptions.TooManyRedirects:
102         sys.exit('HTTP request failed, please check your proxy settings.')
103     except requests.exceptions.RequestException as e:
104         # catastrophic error. bail.
105         raise SystemExit(e)
106
107     if response.status_code >= 200 and response.status_code < 300:
108         print('Got token!')
109         return response.json()['access_token']
110     else:
111         sys.exit('Getting token failed.')
112
113 # create the default realm from file
114
115
116 def createRealm(token, realm):
117     url = base + '/admin/realms'
118     auth = 'bearer ' + token
119     headers = {
120         'content-type': 'application/json',
121         'accept': 'application/json',
122         'authorization': auth
123     }
124     try:
125         response = requests.post(
126             url, verify=False, json=realm, headers=headers)
127     except requests.exceptions.Timeout:
128         sys.exit('HTTP request failed, please check you internet connection.')
129     except requests.exceptions.TooManyRedirects:
130         sys.exit('HTTP request failed, please check your proxy settings.')
131     except requests.exceptions.RequestException as e:
132         # catastrophic error. bail.
133         raise SystemExit(e)
134
135     return response.status_code >= 200 and response.status_code < 300
136
137 # Check if default realm exists
138
139
140 def checkRealmExists(token, realmId):
141     url = base + '/admin/realms/' + realmId
142     auth = 'bearer ' + token
143     headers = {
144         'accept': 'application/json',
145         'authorization': auth
146     }
147     try:
148         response = requests.get(url, verify=False, headers=headers)
149     except requests.exceptions.Timeout:
150         sys.exit('HTTP request failed, please check you internet connection.')
151     except requests.exceptions.TooManyRedirects:
152         sys.exit('HTTP request failed, please check your proxy settings.')
153     except requests.exceptions.RequestException as e:
154         # catastrophic error. bail.
155         raise SystemExit(e)
156
157     if response.status_code >= 200 and response.status_code < 300:
158         return realmId == response.json()['id']
159     else:
160         # sys.exit('Getting realm failed.')
161         return False
162
163 # create a user in default realm
164
165
166 def createUser(token, realmConfig, user):
167     realmId = realmConfig['id']
168     url = base + '/admin/realms/' + realmId + '/users'
169     auth = 'bearer ' + token
170     headers = {
171         'accept': 'application/json',
172         'authorization': auth
173     }
174     try:
175         response = requests.post(url, verify=False, json=user, headers=headers)
176     except requests.exceptions.Timeout:
177         sys.exit('HTTP request failed, please check you internet connection.')
178     except requests.exceptions.TooManyRedirects:
179         sys.exit('HTTP request failed, please check your proxy settings.')
180     except requests.exceptions.RequestException as e:
181         # catastrophic error. bail.
182         raise SystemExit(e)
183
184     if response.status_code >= 200 and response.status_code < 300:
185         print('User', user['username'], 'created!')
186     else:
187         print('User creation', user['username'], 'failed!\n', response.text)
188
189 # creates User accounts in realm based a file
190
191
192 def createUsers(token, realmConfig, authConfig):
193     for user in authConfig['users']:
194         createUser(token, realmConfig, user)
195
196     # create a user based on system user
197     systemUser = {
198         "firstName": getpass.getuser(),
199         "lastName": "",
200         "email": getpass.getuser() + "@sdnr.onap.org",
201         "enabled": "true",
202         "username": getpass.getuser(),
203         "credentials": [
204             {
205                 "type": "password",
206                 "value": password,
207                 "temporary": True
208             }
209         ],
210         "requiredActions": [
211             "UPDATE_PASSWORD"
212         ]
213     }
214     createUser(token, realmConfig, systemUser)
215
216 # Grants a role to a user
217
218
219 def addUserRole(user: dict, role: dict, options: dict):
220     url = options['url'] + '/' + user['id'] + '/role-mappings/realm'
221     try:
222         response = requests.post(url, verify=False, json=[
223                                  {'id': role['id'], 'name':role['name']}],
224                                  headers=options['headers'])
225     except requests.exceptions.Timeout:
226         sys.exit('HTTP request failed, please check you internet connection.')
227     except requests.exceptions.TooManyRedirects:
228         sys.exit('HTTP request failed, please check your proxy settings.')
229     except requests.exceptions.RequestException as e:
230         # catastrophic error. bail.
231         raise SystemExit(e)
232
233     if response.status_code >= 200 and response.status_code < 300:
234         print('User role', user['username'], role['name'], 'created!')
235     else:
236         print('Creation of user role',
237               user['username'], role['name'], 'failed!\n', response.text)
238
239 # searches for the role of a given user
240
241
242 def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict:
243     roleName = 'administration'
244     for grant in authConfig['grants']:
245         if grant['username'] == username:
246             roleName = grant['role']
247     for role in realmConfig['roles']['realm']:
248         if role['name'] == roleName:
249             return role
250     return None
251
252 # adds roles to users
253
254
255 def addUserRoles(token, realmConfig, authConfig):
256     realmId = realmConfig['id']
257     url = base + '/admin/realms/' + realmId + '/users'
258     auth = 'bearer ' + token
259     headers = {
260         'content-type': 'application/json',
261         'accept': 'application/json',
262         'authorization': auth
263     }
264     try:
265         response = requests.get(url, verify=False, headers=headers)
266     except requests.exceptions.Timeout:
267         sys.exit('HTTP request failed, please check you internet connection.')
268     except requests.exceptions.TooManyRedirects:
269         sys.exit('HTTP request failed, please check your proxy settings.')
270     except requests.exceptions.RequestException as e:
271         # catastrophic error. bail.
272         raise SystemExit(e)
273
274     if response.status_code >= 200 and response.status_code < 300:
275         users = response.json()
276         options = {
277             "url": url,
278             "auth": auth,
279             "headers": headers
280         }
281         for user in users:
282             role = findRole(user['username'], authConfig, realmConfig)
283             addUserRole(user, role, options)
284     else:
285         sys.exit('Getting users failed.')
286
287 # main
288
289
290 (realmFile, authFile, readyTimeout) = load_arguments(sys.argv)
291 username = get_environment_variable('ADMIN_USERNAME')
292 password = get_environment_variable('ADMIN_PASSWORD')
293 base = getBaseUrl()
294 isReady(readyTimeout)
295 token = getToken()
296 if token:
297     with open(realmFile) as file:
298         realmConfig = json.load(file)
299         if not checkRealmExists(token, realmConfig['id']):
300             createRealm(token, realmConfig)
301
302         with open(authFile) as authConfig:
303             authConfig = json.load(authConfig)
304             createUsers(token, realmConfig, authConfig)
305         addUserRoles(token, realmConfig, authConfig)
306     exit(0)
307 exit(1)