X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?p=it%2Fotf.git;a=blobdiff_plain;f=otf-ping-test-head%2FotfPingTestHead.py;fp=otf-ping-test-head%2FotfPingTestHead.py;h=116303458231ce998125d4648f0a8281c3d289a0;hp=0000000000000000000000000000000000000000;hb=f49bd1efeaaddd4891c1f329b18d8cfb28b3e75b;hpb=5d8b2580c97e466f9c5a6c78693c24277d94244c diff --git a/otf-ping-test-head/otfPingTestHead.py b/otf-ping-test-head/otfPingTestHead.py new file mode 100644 index 0000000..1163034 --- /dev/null +++ b/otf-ping-test-head/otfPingTestHead.py @@ -0,0 +1,196 @@ +# Copyright (c) 2019 AT&T Intellectual Property. # +# # +# Licensed under the Apache License, Version 2.0 (the "License"); # +# you may not use this file except in compliance with the License. # +# You may obtain a copy of the License at # +# # +# http://www.apache.org/licenses/LICENSE-2.0 # +# # +# Unless required by applicable law or agreed to in writing, software # +# distributed under the License is distributed on an "AS IS" BASIS, # +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +# See the License for the specific language governing permissions and # +# limitations under the License. # +################################################################################ + +from flask import Flask, request, make_response, jsonify, g +import json +import uuid +import traceback +import pyping +import paramiko +import socket +import os +import subprocess +import datetime +import logging +from logging import FileHandler + +#redirect http to https +app = Flask(__name__) + + +# Prevents print statement every time an endpoint is triggered. +logging.getLogger("werkzeug").setLevel(logging.WARNING) + +def unix_time_millis(dt): + epoch = datetime.datetime.utcfromtimestamp(0) + return (dt - epoch).total_seconds() * 1000.0 + +def pingServer(targetHost): + try: + response = subprocess.check_output( + ['ping', '-c', '1', targetHost], # execute the ping command + stderr = subprocess.STDOUT, # retrieve all the output + universal_newlines = True # return as string + ) + except subprocess.CalledProcessError as e: + app.logger.error(e) + app.logger.error('failed getting response from ' + str(targetHost)) + response = None + + return response + +@app.route("/otf/vth/ping/v1/health", methods = ['GET']) +def getHealth(): + return "UP" + +@app.route('/otf/vth/sample/v1', methods = ['POST']) +def sample(): + startTime = unix_time_millis(datetime.datetime.now()) + responseData = { + "vthResponse": { + "testDurationMS": "", + "dateTimeUTC": "", + "abstractMessage": "Success", + "resultData": {} + } + } + responseData['vthResponse']['dateTimeUTC'] = str(datetime.datetime.now()) + endTime = unix_time_millis(datetime.datetime.now()) + responseData['vthResponse']['testDurationMS'] = endTime - startTime + responseData['vthResponse']['resultData']['result'] = "Executed test successfully in " + str(responseData['vthResponse']['testDurationMS']) + " milliseconds." + app.logger.info('hit sample endpoint. response: ' + str(responseData)) + return jsonify(responseData) + +@app.route('/otf/vth/ping/v1', methods = ['POST']) +def testHead(): + responseData = { + "vthResponse": { + "testDurationMS": "", + "dateTimeUTC": "", + "abstractMessage": "", + "resultData": {} + } + } + + responseData['vthResponse']['dateTimeUTC'] = str(datetime.datetime.now()) + startTime = unix_time_millis(datetime.datetime.now()) + + try: + if not request.is_json: + raise ValueError('Request must be a valid JSON object.') + + requestData = request.get_json() + app.logger.info('ping endpoint. request: ' + str(requestData)) + + if 'vthInput' in requestData: + vthInput = requestData['vthInput'] + expectedKeys = ['vthName', 'testConfig', 'testData'] + receivedKeys = vthInput.keys(); + testData = "" + testConfig = "" + + if sorted(expectedKeys) == sorted(receivedKeys): + testData = vthInput['testData'] + + # Check if a target host is provided. + if 'targetHost' not in testData: + raise KeyError('targetHost is required to ping server.') + + # Check if the target host IP address is in the correct format. + # This excludes IPv6. Use IPy to check both IPv6/IPv4. + try: + socket.inet_aton(testData['targetHost']) + except socket.error: + raise ValueError('Invalid IP address assigned to targetHost') + + # Don't use a jump server by default. + if 'useJumpServer' not in testData: + testData['useJumpServer'] = False + else: + raise ValueError('Missing one or more expected keys: {expectedKeys}.'.format(expectedKeys = expectedKeys)) + + if testData['useJumpServer'] == False: + responseData['vthResponse']['resultData']['result'] = pingServer(testData['targetHost']) + else: + testConfig = vthInput['testConfig'] + + if 'jumpServer' not in testConfig: + raise KeyError('Cannot use jump server when jumpServer key is missing.') + + jumpServer = testConfig['jumpServer'] + + if 'host' not in testConfig['jumpServer']: + raise KeyError('Missing host value in jumpServer.') + + host = testConfig['jumpServer']['host'] + + if 'credentials' not in jumpServer: + raise KeyError('Missing credentials in jumpServer.') + + credentials = jumpServer['credentials'] + + if 'username' not in credentials: + raise KeyError('Missing username in credentials.') + + username = credentials['username'] + + if 'password' not in credentials: + raise KeyError('Missing password in credentials.') + + password = credentials['password'] + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(host, username = username, password = password) + command = "ping -c 1 " + testData['targetHost'] + ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command) + output = ssh_stdout.read() + error = ssh_stderr.read() + + responseData['vthResponse']['resultData']['result'] = output + else: + raise KeyError('Missing vthInput parameter(s)') + + # record the end time of the test + endTime = unix_time_millis(datetime.datetime.now()) + + # Calculate the total duration of the test + totalTime = endTime - startTime + + # Set the test duration in the result + responseData['vthResponse']['testDurationMS'] = totalTime + + responseData['vthResponse']['abstractMessage'] = 'Result from pinging {host}'.format(host = testData['targetHost']) + app.logger.info('ping endpoint. response: ' + str(responseData)) + + return jsonify(responseData) + except Exception as e: + app.logger.info(e) + responseData['vthResponse']['abstractMessage'] = str(e) + resp = make_response(json.dumps(responseData)) + endTime = unix_time_millis(datetime.datetime.now()) + + totalTime = endTime - startTime + return resp + +if __name__ == '__main__': + logHandler = FileHandler('otf/logs/pingVTH.log', mode='a') + # logHandler = FileHandler('pingVTH.log', mode='a') + logHandler.setLevel(logging.INFO) + app.logger.setLevel(logging.INFO) + app.logger.addHandler(logHandler) + context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem') + app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context) + # app.run(debug = False, host = '0.0.0.0', port = 5000)