# Copyright (c) 2019 AT&T Intellectual Property. # # # # Licensed under the Apache License, Version 2.0 (the "License"); # # you may not use this file except in compliance with the License. # # You may obtain a copy of the License at # # # # http://www.apache.org/licenses/LICENSE-2.0 # # # # Unless required by applicable law or agreed to in writing, software # # distributed under the License is distributed on an "AS IS" BASIS, # # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # # See the License for the specific language governing permissions and # # limitations under the License. # ################################################################################ from flask import Flask, request, make_response, jsonify, g import json import uuid import traceback import pyping import paramiko import socket import os import subprocess import datetime import logging from logging import FileHandler #redirect http to https app = Flask(__name__) # Prevents print statement every time an endpoint is triggered. logging.getLogger("werkzeug").setLevel(logging.WARNING) def unix_time_millis(dt): epoch = datetime.datetime.utcfromtimestamp(0) return (dt - epoch).total_seconds() * 1000.0 def pingServer(targetHost): try: response = subprocess.check_output( ['ping', '-c', '1', targetHost], # execute the ping command stderr = subprocess.STDOUT, # retrieve all the output universal_newlines = True # return as string ) except subprocess.CalledProcessError as e: app.logger.error(e) app.logger.error('failed getting response from ' + str(targetHost)) response = None return response @app.route("/otf/vth/ping/v1/health", methods = ['GET']) def getHealth(): return "UP" @app.route('/otf/vth/sample/v1', methods = ['POST']) def sample(): startTime = unix_time_millis(datetime.datetime.now()) responseData = { "vthResponse": { "testDurationMS": "", "dateTimeUTC": "", "abstractMessage": "Success", "resultData": {} } } responseData['vthResponse']['dateTimeUTC'] = str(datetime.datetime.now()) endTime = unix_time_millis(datetime.datetime.now()) responseData['vthResponse']['testDurationMS'] = endTime - startTime responseData['vthResponse']['resultData']['result'] = "Executed test successfully in " + str(responseData['vthResponse']['testDurationMS']) + " milliseconds." app.logger.info('hit sample endpoint. response: ' + str(responseData)) return jsonify(responseData) @app.route('/otf/vth/ping/v1', methods = ['POST']) def testHead(): responseData = { "vthResponse": { "testDurationMS": "", "dateTimeUTC": "", "abstractMessage": "", "resultData": {} } } responseData['vthResponse']['dateTimeUTC'] = str(datetime.datetime.now()) startTime = unix_time_millis(datetime.datetime.now()) try: if not request.is_json: raise ValueError('Request must be a valid JSON object.') requestData = request.get_json() app.logger.info('ping endpoint. request: ' + str(requestData)) if 'vthInput' in requestData: vthInput = requestData['vthInput'] expectedKeys = ['vthName', 'testConfig', 'testData'] receivedKeys = vthInput.keys(); testData = "" testConfig = "" if sorted(expectedKeys) == sorted(receivedKeys): testData = vthInput['testData'] # Check if a target host is provided. if 'targetHost' not in testData: raise KeyError('targetHost is required to ping server.') # Check if the target host IP address is in the correct format. # This excludes IPv6. Use IPy to check both IPv6/IPv4. try: socket.inet_aton(testData['targetHost']) except socket.error: raise ValueError('Invalid IP address assigned to targetHost') # Don't use a jump server by default. if 'useJumpServer' not in testData: testData['useJumpServer'] = False else: raise ValueError('Missing one or more expected keys: {expectedKeys}.'.format(expectedKeys = expectedKeys)) if testData['useJumpServer'] == False: responseData['vthResponse']['resultData']['result'] = pingServer(testData['targetHost']) else: testConfig = vthInput['testConfig'] if 'jumpServer' not in testConfig: raise KeyError('Cannot use jump server when jumpServer key is missing.') jumpServer = testConfig['jumpServer'] if 'host' not in testConfig['jumpServer']: raise KeyError('Missing host value in jumpServer.') host = testConfig['jumpServer']['host'] if 'credentials' not in jumpServer: raise KeyError('Missing credentials in jumpServer.') credentials = jumpServer['credentials'] if 'username' not in credentials: raise KeyError('Missing username in credentials.') username = credentials['username'] if 'password' not in credentials: raise KeyError('Missing password in credentials.') password = credentials['password'] ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host, username = username, password = password) command = "ping -c 1 " + testData['targetHost'] ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command) output = ssh_stdout.read() error = ssh_stderr.read() responseData['vthResponse']['resultData']['result'] = output else: raise KeyError('Missing vthInput parameter(s)') # record the end time of the test endTime = unix_time_millis(datetime.datetime.now()) # Calculate the total duration of the test totalTime = endTime - startTime # Set the test duration in the result responseData['vthResponse']['testDurationMS'] = totalTime responseData['vthResponse']['abstractMessage'] = 'Result from pinging {host}'.format(host = testData['targetHost']) app.logger.info('ping endpoint. response: ' + str(responseData)) return jsonify(responseData) except Exception as e: app.logger.info(e) responseData['vthResponse']['abstractMessage'] = str(e) resp = make_response(json.dumps(responseData)) endTime = unix_time_millis(datetime.datetime.now()) totalTime = endTime - startTime return resp if __name__ == '__main__': logHandler = FileHandler('otf/logs/pingVTH.log', mode='a') # logHandler = FileHandler('pingVTH.log', mode='a') logHandler.setLevel(logging.INFO) app.logger.setLevel(logging.INFO) app.logger.addHandler(logHandler) context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem') app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context) # app.run(debug = False, host = '0.0.0.0', port = 5000)