# Copyright (c) 2019 AT&T Intellectual Property. # # # # Licensed under the Apache License, Version 2.0 (the "License"); # # you may not use this file except in compliance with the License. # # You may obtain a copy of the License at # # # # http://www.apache.org/licenses/LICENSE-2.0 # # # # Unless required by applicable law or agreed to in writing, software # # distributed under the License is distributed on an "AS IS" BASIS, # # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # # See the License for the specific language governing permissions and # # limitations under the License. # ################################################################################ # File name: a1-mediator-vth.py # # Description: vth for a1-mediator-vth service # # Date created: 08/22/2019 # # Last modified: 04/02/2020 # # Python Version: 3.7.4 # # Author: Jackie Chen (jv246a) # # Email: jv246a@att.com # ################################################################################ import datetime import json import logging from logging import FileHandler import os import requests from flask import Flask, request, jsonify # redirect http to https app = Flask(__name__) # Prevents print statement every time an endpoint is triggered. logging.getLogger("werkzeug").setLevel(logging.WARNING) def sendCallback(url, data): try: if type(data) is not dict: data = {"msg": data} app.logger.info("sending callback") requests.post(url, json=data) except Exception as e: app.logger.info(e) return def unix_time_millis(dt): epoch = datetime.datetime.utcfromtimestamp(0) return (dt - epoch).total_seconds() * 1000.0 @app.route("/otf/vth/oran/a1/v1/health", methods=['GET']) def getHealth(): return "UP" @app.route("/otf/vth/oran/a1/v1", methods=['POST']) def executeRicRequest(): response_data = { 'vthResponse': { 'testDuration': '', 'dateTimeUTC': str(datetime.datetime.now()), 'abstractMessage': '', 'resultData': {} } } startTime = unix_time_millis(datetime.datetime.now()) ret_url = request.args.get('retURL') try: if not request.is_json: raise ValueError("request must be json") requestData = request.get_json() app.logger.info("A1 requestData:" + str(requestData)) action = requestData['action'].lower() _check_incoming_request(requestData) os.environ['NO_PROXY'] = '*' with open('config.json') as configFile: config = json.load(configFile) baseAddress = config['base_address'] if action == 'health_check' or action == 'list_policy': res = requests.get(baseAddress + config['actions_path'][action]) response_data['vthResponse']['resultData']['statusCode'] = res.status_code if action == 'health_check': response_data['vthResponse']['resultData']['resultOutput'] = res.text else: response_data['vthResponse']['resultData']['resultOutput'] = res.json() elif action == 'list_policy_instance': res = requests.get(baseAddress + config['actions_path'][action] .format(policy_type_id=requestData['policy_type_id'])) response_data['vthResponse']['resultData']['statusCode'] = res.status_code response_data['vthResponse']['resultData']['resultOutput'] = res.json() elif action == 'get_policy_instance_status': res = requests.get(baseAddress + config['actions_path'][action] .format(policy_type_id=requestData['policy_type_id'], policy_instance_id=requestData['policy_instance_id'])) response_data['vthResponse']['resultData']['statusCode'] = res.status_code response_data['vthResponse']['resultData']['resultOutput'] = res.json() elif action == 'edit_policy': res = _send_edit_request(requestData, config) response_data['vthResponse']['resultData']['statusCode'] = res.status_code if requestData['request_type'].lower() == 'get' and res.status_code == 200: response_data['vthResponse']['resultData']['resultOutput'] = res.json() else: response_data['vthResponse']['resultData']['resultOutput'] = res.text elif action == 'edit_policy_instance': res = _send_edit_request(requestData, config) response_data['vthResponse']['resultData']['statusCode'] = res.status_code if requestData['request_type'].lower() == 'get' and res.status_code == 200: response_data['vthResponse']['resultData']['resultOutput'] = res.json() else: response_data['vthResponse']['resultData']['resultOutput'] = res.text except Exception as ex: endTime = unix_time_millis(datetime.datetime.now()) totalTime = endTime - startTime response_data['vthResponse']['testDuration'] = totalTime response_data['vthResponse']['abstractMessage'] = str(ex) return jsonify(response_data) endTime = unix_time_millis(datetime.datetime.now()) totalTime = endTime - startTime response_data['vthResponse']['testDuration'] = totalTime if ret_url is not None: sendCallback(ret_url, response_data) return '', 200 return jsonify(response_data), 200 def _send_edit_request(request_data, config): baseAddress = config['base_address'] path = '' action = request_data['action'] policy_type_id = request_data['policy_type_id'] request_type = request_data['request_type'] if action == "edit_policy": path = baseAddress + config['actions_path'][action].format(policy_type_id=policy_type_id) if action == 'edit_policy_instance': instance_id = request_data['policy_instance_id'] path = baseAddress + config['actions_path'][action].format(policy_type_id=policy_type_id, policy_instance_id=instance_id) if request_type == 'get': return requests.get(path) if request_type == 'put': payload = request_data['payload'] return requests.put(path, json=payload) if request_type == 'delete': return requests.delete(path) def _check_incoming_request(requestData): # check if the request is valid if 'action' not in requestData: raise KeyError('no action was specify') action = requestData['action'].lower() edit_actions = ['edit_policy', 'edit_policy_instance'] requires_policy_id = ['edit_policy', 'list_policy_instance' , 'edit_policy_instance', 'get_policy_instance_status'] requires_policy_instance_id = ['edit_policy_instance', 'get_policy_instance_status'] possible_actions = ['health_check', 'list_policy', 'edit_policy', 'list_policy_instance' , 'edit_policy_instance', 'get_policy_instance_status'] possible_request_type = ['get', 'put', 'delete'] if action not in possible_actions: raise KeyError("invalid action") if action in edit_actions: # request type is required if 'request_type' not in requestData: raise KeyError('this action: ' + action + ' requires a request type') if requestData['request_type'] not in possible_request_type: raise KeyError('this request_type: ' + requestData['request_type'] + ' is not valid') if requestData['request_type'] == 'put' and 'payload' not in requestData: raise KeyError('put request requires a payload') if action in requires_policy_id: if 'policy_type_id' not in requestData: raise KeyError('this action: ' + action + ' requires a policy_type_id') if action in requires_policy_instance_id: if 'policy_instance_id' not in requestData: raise KeyError('this action: ' + action + ' requires a policy_instance_id') if __name__ == '__main__': logHandler = FileHandler('a1-mediator-vth.log', mode='a') logHandler.setLevel(logging.INFO) app.logger.setLevel(logging.INFO) app.logger.addHandler(logHandler) # context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem') # app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context) app.run(debug=False, host='0.0.0.0', port=5000)