--- /dev/null
+# Copyright (c) 2019 AT&T Intellectual Property. #\r
+# #\r
+# Licensed under the Apache License, Version 2.0 (the "License"); #\r
+# you may not use this file except in compliance with the License. #\r
+# You may obtain a copy of the License at #\r
+# #\r
+# http://www.apache.org/licenses/LICENSE-2.0 #\r
+# #\r
+# Unless required by applicable law or agreed to in writing, software #\r
+# distributed under the License is distributed on an "AS IS" BASIS, #\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #\r
+# See the License for the specific language governing permissions and #\r
+# limitations under the License. #\r
+################################################################################\r
+# File name: dmaap-vth.py #\r
+# Description: vth that utilize dmaap to subscribe and publish to topics #\r
+# Date created: 02/21/2020 #\r
+# Last modified: 04/02/2020 #\r
+# Python Version: 3.7 #\r
+# Author: Jackie Chen (jv246a) #\r
+# Email: jv246a@att.com #\r
+################################################################################\r
+\r
+import datetime\r
+from configparser import ConfigParser\r
+import os\r
+import logging\r
+from logging import FileHandler\r
+import requests\r
+from flask import Flask, request, jsonify\r
+\r
+# redirect http to https\r
+app = Flask(__name__)\r
+\r
+# Prevents print statement every time an endpoint is triggered.\r
+logging.getLogger("werkzeug").setLevel(logging.WARNING)\r
+\r
+\r
+def sendCallback(url, data):\r
+ try:\r
+ if type(data) is not dict:\r
+ data = {"msg": data}\r
+ app.logger.info("sending callback")\r
+ requests.post(url, json= data)\r
+ except Exception as e:\r
+ app.logger.info(e)\r
+ return\r
+\r
+def unix_time_millis(dt):\r
+ epoch = datetime.datetime.utcfromtimestamp(0)\r
+ return (dt - epoch).total_seconds() * 1000.0\r
+\r
+\r
+def _get_request_data():\r
+ if not request.is_json:\r
+ raise ValueError("request must be json")\r
+ requestData = request.get_json()\r
+ return requestData\r
+\r
+\r
+def _get_config(config_file_name):\r
+ config = ConfigParser(os.environ)\r
+ config.read(config_file_name)\r
+ return config\r
+\r
+def _validate_request(request_data, isPublish=True):\r
+ missing_params = []\r
+\r
+ if 'topic_name' not in request_data:\r
+ missing_params.append("topic_name")\r
+ if isPublish:\r
+ if 'data' not in request_data:\r
+ missing_params.append('data')\r
+ else:\r
+ if 'consumer_group' not in request_data:\r
+ missing_params.append('consumer_group')\r
+ if 'consumer_id' not in request_data:\r
+ missing_params.append('consumer_id')\r
+\r
+ if missing_params:\r
+ err_msg = '{} request requires the following: '.format('publish' if isPublish else 'subscribe')\r
+ err_msg += ','.join(missing_params)\r
+ raise KeyError(err_msg)\r
+\r
+\r
+def _build_url(config, request_data, is_publish=True):\r
+ if is_publish:\r
+ base_path = config['resource']['base_address'] + config['resource']['publish']\r
+ topic_name = request_data['topic_name']\r
+ publish_address = base_path.format(topic_name=topic_name)\r
+ return publish_address\r
+\r
+ base_path = config['resource']['base_address'] + config['resource']['subscribe']\r
+ topic_name = request_data['topic_name']\r
+ consumer_group = request_data['consumer_group']\r
+ consumer_id = request_data['consumer_id']\r
+ subscribe_address = base_path.format(topic_name=topic_name, consumer_group=consumer_group, consumer_id=consumer_id)\r
+ if ('timeout' in request_data):\r
+ subscribe_address = (subscribe_address + '?timeout={}').format(request_data['timeout'])\r
+ return subscribe_address\r
+\r
+\r
+def _send_request(url, config, is_subscribe_request=False, payload=None):\r
+ # setup default values\r
+ auth_enabled = config.getboolean('auth', 'auth_enabled')\r
+ proxy_enabled = config.getboolean('resource', 'proxy_enabled')\r
+ username = ''\r
+ password = ''\r
+ req_proxies = {\r
+ 'http': None,\r
+ 'https': None\r
+ }\r
+ # place proxy and authentication information\r
+ if auth_enabled:\r
+ username = config['auth']['username']\r
+ password = config['auth']['password']\r
+ if proxy_enabled:\r
+ req_proxies['http'] = config['resource']['http_proxy']\r
+ req_proxies['https'] = config['resource']['https_proxy']\r
+\r
+ # for subscribe request\r
+ if is_subscribe_request:\r
+ return requests.get(url,\r
+ auth=(username, password) if auth_enabled else None,\r
+ proxies=req_proxies if proxy_enabled else None)\r
+ # for publish request\r
+ req_headers = {'Content-type': 'application/json'}\r
+ return requests.post(url,\r
+ json=payload,\r
+ auth=(username, password) if auth_enabled else None,\r
+ proxies=req_proxies if proxy_enabled else None,\r
+ headers=req_headers)\r
+\r
+@app.route("/otf/vth/oran/dmaap/v1/health", methods=['GET'])\r
+def getHealth():\r
+ return 'UP'\r
+\r
+@app.route("/otf/vth/oran/dmaap/v1/subscribe", methods=["POST"])\r
+def subscribeRequest():\r
+ response_data = {\r
+ 'vthResponse': {\r
+ 'testDuration': '',\r
+ 'dateTimeUTC': str(datetime.datetime.now()),\r
+ 'abstractMessage': '',\r
+ 'resultData': {}\r
+ }\r
+ }\r
+ ret_url = request.args.get('retURL')\r
+ startTime = unix_time_millis(datetime.datetime.now())\r
+ try:\r
+ # validate request\r
+ request_data = _get_request_data()\r
+ _validate_request(request_data, isPublish=False)\r
+ app.logger.info("incoming subscribe request w/ the following payload:" + str(request_data))\r
+\r
+ # setup phase\r
+ config = _get_config('config.ini')\r
+ subscribe_address = _build_url(config, request_data, is_publish=False)\r
+\r
+ # build response\r
+ app.logger.info('Sending GET to subscribe')\r
+ res = _send_request(subscribe_address, config, is_subscribe_request=True)\r
+ app.logger.info('Response received from subscribe: {}'.format(res.json()))\r
+ response_data['vthResponse']['abstractMessage'] = 'Result from subscribe request'\r
+ response_data['vthResponse']['resultData']['status_code'] = res.status_code\r
+ response_data['vthResponse']['resultData']['result_output'] = res.json()\r
+ except Exception as ex:\r
+ endTime = unix_time_millis(datetime.datetime.now())\r
+ totalTime = endTime - startTime\r
+ response_data['vthResponse']['testDuration'] = totalTime\r
+ response_data['vthResponse']['abstractMessage'] = 'error: ' + str(ex)\r
+ app.logger.error('ERROR:{}'.format(str(ex)))\r
+ return jsonify(response_data)\r
+\r
+ endTime = unix_time_millis(datetime.datetime.now())\r
+ totalTime = endTime - startTime\r
+ response_data['vthResponse']['testDuration'] = totalTime\r
+ if ret_url is not None:\r
+ sendCallback(ret_url,response_data)\r
+ return '',200\r
+ return jsonify(response_data), 200\r
+\r
+\r
+@app.route("/otf/vth/oran/dmaap/v1/publish", methods=['POST'])\r
+def publishRequest():\r
+ response_data = {\r
+ 'vthResponse': {\r
+ 'testDuration': '',\r
+ 'dateTimeUTC': str(datetime.datetime.now()),\r
+ 'abstractMessage': '',\r
+ 'resultData': {}\r
+ }\r
+ }\r
+ startTime = unix_time_millis(datetime.datetime.now())\r
+ ret_url = request.args.get('retURL')\r
+\r
+ try:\r
+ # validate request\r
+ request_data = _get_request_data()\r
+ _validate_request(request_data)\r
+ app.logger.info("incoming publish request w/ the following payload:" + str(request_data))\r
+\r
+ # setup phase\r
+ config = _get_config('config.ini')\r
+ payload = request_data['data']\r
+ publish_address = _build_url(config, request_data)\r
+\r
+ # build response\r
+ app.logger.info("Sending POST to publish")\r
+ res = _send_request(url=publish_address, config=config, payload=payload)\r
+ app.logger.info("Response received from publish: {}".format(res.json()))\r
+ response_data['vthResponse']['abstractMessage'] = 'Result from publish request'\r
+ response_data['vthResponse']['resultData']['status_code'] = res.status_code\r
+ response_data['vthResponse']['resultData']['result_output'] = res.json()\r
+ except Exception as ex:\r
+ endTime = unix_time_millis(datetime.datetime.now())\r
+ totalTime = endTime - startTime\r
+ response_data['vthResponse']['testDuration'] = totalTime\r
+ response_data['vthResponse']['abstractMessage'] = 'error: ' + str(ex)\r
+ app.logger.error('ERROR:{}'.format(str(ex)))\r
+ return jsonify(response_data)\r
+\r
+ endTime = unix_time_millis(datetime.datetime.now())\r
+ totalTime = endTime - startTime\r
+ response_data['vthResponse']['testDuration'] = totalTime\r
+ if ret_url is not None:\r
+ sendCallback(ret_url,response_data)\r
+ return '',200\r
+ return jsonify(response_data), 200\r
+\r
+if __name__ == '__main__':\r
+ logHandler = FileHandler('dmaap-vth.log', mode='a')\r
+ logHandler.setLevel(logging.INFO)\r
+ app.logger.setLevel(logging.INFO)\r
+ app.logger.addHandler(logHandler)\r
+ # context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem')\r
+ # app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context)\r
+ app.run(debug=False, host='0.0.0.0', port=5000)\r