1 # Copyright (c) 2019 AT&T Intellectual Property. #
\r
3 # Licensed under the Apache License, Version 2.0 (the "License"); #
\r
4 # you may not use this file except in compliance with the License. #
\r
5 # You may obtain a copy of the License at #
\r
7 # http://www.apache.org/licenses/LICENSE-2.0 #
\r
9 # Unless required by applicable law or agreed to in writing, software #
\r
10 # distributed under the License is distributed on an "AS IS" BASIS, #
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
\r
12 # See the License for the specific language governing permissions and #
\r
13 # limitations under the License. #
\r
14 ################################################################################
\r
17 from flask import Flask, request, make_response, jsonify, g
\r
23 from logging import FileHandler
\r
25 # redirect http to https
\r
26 app = Flask(__name__)
\r
29 # Prevents print statement every time an endpoint is triggered.
\r
30 logging.getLogger("werkzeug").setLevel(logging.WARNING)
\r
33 def unix_time_millis(dt):
\r
34 epoch = datetime.datetime.utcfromtimestamp(0)
\r
35 return (dt - epoch).total_seconds() * 1000.0
\r
38 @app.route("/otf/vth/ssh/v1/health", methods = ['GET'])
\r
43 @app.route('/otf/vth/ssh/v1', methods = ['POST'])
\r
47 "testDurationMS": "",
\r
49 "abstractMessage": "",
\r
54 responseData['vthResponse']['dateTimeUTC'] = str(datetime.datetime.now())
\r
55 start_time = unix_time_millis(datetime.datetime.now())
\r
58 if not request.is_json:
\r
59 raise ValueError('Request must be a valid JSON object.')
\r
61 request_data = request.get_json()
\r
63 if 'vthInput' in request_data:
\r
64 vth_input = request_data['vthInput']
\r
65 expected_keys = ['vthName', 'testConfig', 'testData']
\r
66 received_keys = vth_input.keys();
\r
70 if sorted(expected_keys) == sorted(received_keys):
\r
71 test_data = vth_input['testData']
\r
73 if 'command' not in test_data:
\r
74 raise ValueError('Must supply value testData.command')
\r
77 raise ValueError('Missing one or more expected keys: {expectedKeys}.'.format(expectedKeys=expected_keys))
\r
79 test_config = vth_input['testConfig']
\r
81 if 'jumpServer' not in test_config:
\r
82 raise KeyError('Cannot use jump server when jumpServer key is missing.')
\r
84 jump_server = test_config['jumpServer']
\r
86 if 'host' not in test_config['jumpServer']:
\r
87 raise KeyError('Missing host value in jumpServer.')
\r
89 host = test_config['jumpServer']['host']
\r
91 if 'credentials' not in jump_server:
\r
92 raise KeyError('Missing credentials in jumpServer.')
\r
94 credentials = jump_server['credentials']
\r
96 if 'username' not in credentials:
\r
97 raise KeyError('Missing username in credentials.')
\r
99 username = credentials['username']
\r
101 if 'password' not in credentials:
\r
102 raise KeyError('Missing password in credentials.')
\r
104 password = credentials['password']
\r
106 ssh = paramiko.SSHClient()
\r
107 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
\r
109 if 'usePrivateKey' in test_config:
\r
110 if test_config['usePrivateKey']:
\r
111 key_passphrase = os.environ.get('id_otf_key_passphrase')
\r
112 app.logger.info(key_passphrase)
\r
113 ssh.connect(host, username=username, passphrase='passphrase', key_filename='./ssh/id_otf.key')
\r
114 with open('./ssh/id_otf.key', 'r') as myfile:
\r
115 data = myfile.read().replace('\n', '')
\r
117 app.logger.info(data)
\r
119 ssh.connect(host, username=username, password=password)
\r
120 command = test_data['command']
\r
121 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command)
\r
123 responseData['vthResponse']['resultData']['output'] = str(ssh_stdout.read()).replace('"', '\\"').replace('\n', '\\n')
\r
124 responseData['vthResponse']['resultData']['error'] = str(ssh_stderr.read()).replace('"', '\\"').replace('\n', '\\n')
\r
127 raise KeyError('Missing vthInput parameter(s)')
\r
129 # record the end time of the test
\r
130 endTime = unix_time_millis(datetime.datetime.now())
\r
132 # Calculate the total duration of the test
\r
133 totalTime = endTime - start_time
\r
135 # Set the test duration in the result
\r
136 responseData['vthResponse']['testDurationMS'] = totalTime
\r
138 responseData['vthResponse']['abstractMessage'] = 'done'
\r
140 app.logger.info(str(responseData))
\r
142 return jsonify(responseData)
\r
143 except Exception as e:
\r
145 responseData['vthResponse']['abstractMessage'] = str(e)
\r
146 resp = make_response(json.dumps(responseData))
\r
147 endTime = unix_time_millis(datetime.datetime.now())
\r
149 totalTime = endTime - start_time
\r
153 if __name__ == '__main__':
\r
154 logHandler = FileHandler('otf/logs/sshVTH.log', mode='a')
\r
155 # logHandler = FileHandler('sshVTH.log', mode='a')
\r
156 logHandler.setLevel(logging.INFO)
\r
157 app.logger.setLevel(logging.INFO)
\r
158 app.logger.addHandler(logHandler)
\r
159 context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem')
\r
160 app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context)
\r
161 # app.run(debug = False, host = '0.0.0.0', port=5000)
\r