1 # Copyright (c) 2019 AT&T Intellectual Property. #
\r
3 # Licensed under the Apache License, Version 2.0 (the "License"); #
\r
4 # you may not use this file except in compliance with the License. #
\r
5 # You may obtain a copy of the License at #
\r
7 # http://www.apache.org/licenses/LICENSE-2.0 #
\r
9 # Unless required by applicable law or agreed to in writing, software #
\r
10 # distributed under the License is distributed on an "AS IS" BASIS, #
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
\r
12 # See the License for the specific language governing permissions and #
\r
13 # limitations under the License. #
\r
14 ################################################################################
\r
16 from flask import Flask, request, make_response, jsonify, g
\r
27 from logging import FileHandler
\r
29 #redirect http to https
\r
30 app = Flask(__name__)
\r
33 # Prevents print statement every time an endpoint is triggered.
\r
34 logging.getLogger("werkzeug").setLevel(logging.WARNING)
\r
36 def unix_time_millis(dt):
\r
37 epoch = datetime.datetime.utcfromtimestamp(0)
\r
38 return (dt - epoch).total_seconds() * 1000.0
\r
40 def pingServer(targetHost):
\r
42 response = subprocess.check_output(
\r
43 ['ping', '-c', '1', targetHost], # execute the ping command
\r
44 stderr = subprocess.STDOUT, # retrieve all the output
\r
45 universal_newlines = True # return as string
\r
47 except subprocess.CalledProcessError as e:
\r
49 app.logger.error('failed getting response from ' + str(targetHost))
\r
54 @app.route("/otf/vth/ping/v1/health", methods = ['GET'])
\r
58 @app.route('/otf/vth/sample/v1', methods = ['POST'])
\r
60 startTime = unix_time_millis(datetime.datetime.now())
\r
63 "testDurationMS": "",
\r
65 "abstractMessage": "Success",
\r
69 responseData['vthResponse']['dateTimeUTC'] = str(datetime.datetime.now())
\r
70 endTime = unix_time_millis(datetime.datetime.now())
\r
71 responseData['vthResponse']['testDurationMS'] = endTime - startTime
\r
72 responseData['vthResponse']['resultData']['result'] = "Executed test successfully in " + str(responseData['vthResponse']['testDurationMS']) + " milliseconds."
\r
73 app.logger.info('hit sample endpoint. response: ' + str(responseData))
\r
74 return jsonify(responseData)
\r
76 @app.route('/otf/vth/ping/v1', methods = ['POST'])
\r
80 "testDurationMS": "",
\r
82 "abstractMessage": "",
\r
87 responseData['vthResponse']['dateTimeUTC'] = str(datetime.datetime.now())
\r
88 startTime = unix_time_millis(datetime.datetime.now())
\r
91 if not request.is_json:
\r
92 raise ValueError('Request must be a valid JSON object.')
\r
94 requestData = request.get_json()
\r
95 app.logger.info('ping endpoint. request: ' + str(requestData))
\r
97 if 'vthInput' in requestData:
\r
98 vthInput = requestData['vthInput']
\r
99 expectedKeys = ['vthName', 'testConfig', 'testData']
\r
100 receivedKeys = vthInput.keys();
\r
104 if sorted(expectedKeys) == sorted(receivedKeys):
\r
105 testData = vthInput['testData']
\r
107 # Check if a target host is provided.
\r
108 if 'targetHost' not in testData:
\r
109 raise KeyError('targetHost is required to ping server.')
\r
111 # Check if the target host IP address is in the correct format.
\r
112 # This excludes IPv6. Use IPy to check both IPv6/IPv4.
\r
114 socket.inet_aton(testData['targetHost'])
\r
115 except socket.error:
\r
116 raise ValueError('Invalid IP address assigned to targetHost')
\r
118 # Don't use a jump server by default.
\r
119 if 'useJumpServer' not in testData:
\r
120 testData['useJumpServer'] = False
\r
122 raise ValueError('Missing one or more expected keys: {expectedKeys}.'.format(expectedKeys = expectedKeys))
\r
124 if testData['useJumpServer'] == False:
\r
125 responseData['vthResponse']['resultData']['result'] = pingServer(testData['targetHost'])
\r
127 testConfig = vthInput['testConfig']
\r
129 if 'jumpServer' not in testConfig:
\r
130 raise KeyError('Cannot use jump server when jumpServer key is missing.')
\r
132 jumpServer = testConfig['jumpServer']
\r
134 if 'host' not in testConfig['jumpServer']:
\r
135 raise KeyError('Missing host value in jumpServer.')
\r
137 host = testConfig['jumpServer']['host']
\r
139 if 'credentials' not in jumpServer:
\r
140 raise KeyError('Missing credentials in jumpServer.')
\r
142 credentials = jumpServer['credentials']
\r
144 if 'username' not in credentials:
\r
145 raise KeyError('Missing username in credentials.')
\r
147 username = credentials['username']
\r
149 if 'password' not in credentials:
\r
150 raise KeyError('Missing password in credentials.')
\r
152 password = credentials['password']
\r
154 ssh = paramiko.SSHClient()
\r
155 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
\r
156 ssh.connect(host, username = username, password = password)
\r
157 command = "ping -c 1 " + testData['targetHost']
\r
158 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command)
\r
159 output = ssh_stdout.read()
\r
160 error = ssh_stderr.read()
\r
162 responseData['vthResponse']['resultData']['result'] = output
\r
164 raise KeyError('Missing vthInput parameter(s)')
\r
166 # record the end time of the test
\r
167 endTime = unix_time_millis(datetime.datetime.now())
\r
169 # Calculate the total duration of the test
\r
170 totalTime = endTime - startTime
\r
172 # Set the test duration in the result
\r
173 responseData['vthResponse']['testDurationMS'] = totalTime
\r
175 responseData['vthResponse']['abstractMessage'] = 'Result from pinging {host}'.format(host = testData['targetHost'])
\r
176 app.logger.info('ping endpoint. response: ' + str(responseData))
\r
178 return jsonify(responseData)
\r
179 except Exception as e:
\r
181 responseData['vthResponse']['abstractMessage'] = str(e)
\r
182 resp = make_response(json.dumps(responseData))
\r
183 endTime = unix_time_millis(datetime.datetime.now())
\r
185 totalTime = endTime - startTime
\r
188 if __name__ == '__main__':
\r
189 logHandler = FileHandler('otf/logs/pingVTH.log', mode='a')
\r
190 # logHandler = FileHandler('pingVTH.log', mode='a')
\r
191 logHandler.setLevel(logging.INFO)
\r
192 app.logger.setLevel(logging.INFO)
\r
193 app.logger.addHandler(logHandler)
\r
194 context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem')
\r
195 #app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context)
\r
196 app.run(debug = False, host = '0.0.0.0', port = 5000)
\r