fce93fd41dd967e4c93e9230696dc3fdb397b96d
[it/otf.git] / dmaap-vth / dmaap_vth.py
1 #   Copyright (c) 2019 AT&T Intellectual Property.                             #\r
2 #                                                                              #\r
3 #   Licensed under the Apache License, Version 2.0 (the "License");            #\r
4 #   you may not use this file except in compliance with the License.           #\r
5 #   You may obtain a copy of the License at                                    #\r
6 #                                                                              #\r
7 #       http://www.apache.org/licenses/LICENSE-2.0                             #\r
8 #                                                                              #\r
9 #   Unless required by applicable law or agreed to in writing, software        #\r
10 #   distributed under the License is distributed on an "AS IS" BASIS,          #\r
11 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #\r
12 #   See the License for the specific language governing permissions and        #\r
13 #   limitations under the License.                                             #\r
14 ################################################################################\r
15 # File name: dmaap-vth.py                                                      #\r
16 # Description: vth that utilize dmaap  to subscribe and publish to topics      #\r
17 # Date created: 02/21/2020                                                     #\r
18 # Last modified: 04/02/2020                                                    #\r
19 # Python Version: 3.7                                                          #\r
20 # Author: Jackie Chen (jv246a)                                                 #\r
21 # Email: jv246a@att.com                                                        #\r
22 ################################################################################\r
23 \r
24 import datetime\r
25 from configparser import ConfigParser\r
26 import os\r
27 import logging\r
28 from logging import FileHandler\r
29 import requests\r
30 from flask import Flask, request, jsonify\r
31 \r
32 # redirect http to https\r
33 app = Flask(__name__)\r
34 \r
35 # Prevents print statement every time an endpoint is triggered.\r
36 logging.getLogger("werkzeug").setLevel(logging.WARNING)\r
37 \r
38 \r
39 def sendCallback(url, data):\r
40     try:\r
41         if type(data) is not dict:\r
42             data = {"msg": data}\r
43         app.logger.info("sending callback")\r
44         requests.post(url, json= data)\r
45     except Exception as e:\r
46         app.logger.info(e)\r
47     return\r
48 \r
49 def unix_time_millis(dt):\r
50     epoch = datetime.datetime.utcfromtimestamp(0)\r
51     return (dt - epoch).total_seconds() * 1000.0\r
52 \r
53 \r
54 def _get_request_data():\r
55     if not request.is_json:\r
56         raise ValueError("request must be json")\r
57     requestData = request.get_json()\r
58     return requestData\r
59 \r
60 \r
61 def _get_config(config_file_name):\r
62     config = ConfigParser(os.environ)\r
63     config.read(config_file_name)\r
64     return config\r
65 \r
66 def _validate_request(request_data, isPublish=True):\r
67     missing_params = []\r
68 \r
69     if 'topic_name' not in request_data:\r
70         missing_params.append("topic_name")\r
71     if isPublish:\r
72         if 'data' not in request_data:\r
73             missing_params.append('data')\r
74     else:\r
75         if 'consumer_group' not in request_data:\r
76             missing_params.append('consumer_group')\r
77         if 'consumer_id' not in request_data:\r
78             missing_params.append('consumer_id')\r
79 \r
80     if missing_params:\r
81         err_msg = '{} request requires the following: '.format('publish' if isPublish else 'subscribe')\r
82         err_msg += ','.join(missing_params)\r
83         raise KeyError(err_msg)\r
84 \r
85 \r
86 def _build_url(config, request_data, is_publish=True):\r
87     if is_publish:\r
88         base_path = config['resource']['base_address'] + config['resource']['publish']\r
89         topic_name = request_data['topic_name']\r
90         publish_address = base_path.format(topic_name=topic_name)\r
91         return publish_address\r
92 \r
93     base_path = config['resource']['base_address'] + config['resource']['subscribe']\r
94     topic_name = request_data['topic_name']\r
95     consumer_group = request_data['consumer_group']\r
96     consumer_id = request_data['consumer_id']\r
97     subscribe_address = base_path.format(topic_name=topic_name, consumer_group=consumer_group, consumer_id=consumer_id)\r
98     if ('timeout' in request_data):\r
99         subscribe_address = (subscribe_address + '?timeout={}').format(request_data['timeout'])\r
100     return subscribe_address\r
101 \r
102 \r
103 def _send_request(url, config, is_subscribe_request=False, payload=None):\r
104     # setup default values\r
105     auth_enabled = config.getboolean('auth', 'auth_enabled')\r
106     proxy_enabled = config.getboolean('resource', 'proxy_enabled')\r
107     username = ''\r
108     password = ''\r
109     req_proxies = {\r
110         'http': None,\r
111         'https': None\r
112     }\r
113     # place proxy and authentication information\r
114     if auth_enabled:\r
115         username = config['auth']['username']\r
116         password = config['auth']['password']\r
117     if proxy_enabled:\r
118         req_proxies['http'] = config['resource']['http_proxy']\r
119         req_proxies['https'] = config['resource']['https_proxy']\r
120 \r
121     # for subscribe request\r
122     if is_subscribe_request:\r
123         return requests.get(url,\r
124                             auth=(username, password) if auth_enabled else None,\r
125                             proxies=req_proxies if proxy_enabled else None)\r
126     # for publish request\r
127     req_headers = {'Content-type': 'application/json'}\r
128     return requests.post(url,\r
129                          json=payload,\r
130                          auth=(username, password) if auth_enabled else None,\r
131                          proxies=req_proxies if proxy_enabled else None,\r
132                          headers=req_headers)\r
133 \r
134 @app.route("/otf/vth/oran/dmaap/v1/health", methods=['GET'])\r
135 def getHealth():\r
136     return 'UP'\r
137 \r
138 @app.route("/otf/vth/oran/dmaap/v1/subscribe", methods=["POST"])\r
139 def subscribeRequest():\r
140     response_data = {\r
141         'vthResponse': {\r
142             'testDuration': '',\r
143             'dateTimeUTC': str(datetime.datetime.now()),\r
144             'abstractMessage': '',\r
145             'resultData': {}\r
146         }\r
147     }\r
148     ret_url = request.args.get('retURL')\r
149     startTime = unix_time_millis(datetime.datetime.now())\r
150     try:\r
151         # validate request\r
152         request_data = _get_request_data()\r
153         _validate_request(request_data, isPublish=False)\r
154         app.logger.info("incoming subscribe request w/ the following payload:" + str(request_data))\r
155 \r
156         # setup phase\r
157         config = _get_config('config.ini')\r
158         subscribe_address = _build_url(config, request_data, is_publish=False)\r
159 \r
160         # build response\r
161         app.logger.info('Sending GET to subscribe')\r
162         res = _send_request(subscribe_address, config, is_subscribe_request=True)\r
163         app.logger.info('Response received from subscribe: {}'.format(res.json()))\r
164         response_data['vthResponse']['abstractMessage'] = 'Result from subscribe request'\r
165         response_data['vthResponse']['resultData']['status_code'] = res.status_code\r
166         response_data['vthResponse']['resultData']['result_output'] = res.json()\r
167     except Exception as ex:\r
168         endTime = unix_time_millis(datetime.datetime.now())\r
169         totalTime = endTime - startTime\r
170         response_data['vthResponse']['testDuration'] = totalTime\r
171         response_data['vthResponse']['abstractMessage'] = 'error: ' + str(ex)\r
172         app.logger.error('ERROR:{}'.format(str(ex)))\r
173         return jsonify(response_data)\r
174 \r
175     endTime = unix_time_millis(datetime.datetime.now())\r
176     totalTime = endTime - startTime\r
177     response_data['vthResponse']['testDuration'] = totalTime\r
178     if ret_url is not None:\r
179         sendCallback(ret_url,response_data)\r
180         return '',200\r
181     return jsonify(response_data), 200\r
182 \r
183 \r
184 @app.route("/otf/vth/oran/dmaap/v1/publish", methods=['POST'])\r
185 def publishRequest():\r
186     response_data = {\r
187         'vthResponse': {\r
188             'testDuration': '',\r
189             'dateTimeUTC': str(datetime.datetime.now()),\r
190             'abstractMessage': '',\r
191             'resultData': {}\r
192         }\r
193     }\r
194     startTime = unix_time_millis(datetime.datetime.now())\r
195     ret_url = request.args.get('retURL')\r
196 \r
197     try:\r
198         # validate request\r
199         request_data = _get_request_data()\r
200         _validate_request(request_data)\r
201         app.logger.info("incoming publish request w/ the following payload:" + str(request_data))\r
202 \r
203         # setup phase\r
204         config = _get_config('config.ini')\r
205         payload = request_data['data']\r
206         publish_address = _build_url(config, request_data)\r
207 \r
208         # build response\r
209         app.logger.info("Sending POST to publish")\r
210         res = _send_request(url=publish_address, config=config, payload=payload)\r
211         app.logger.info("Response received from publish: {}".format(res.json()))\r
212         response_data['vthResponse']['abstractMessage'] = 'Result from publish request'\r
213         response_data['vthResponse']['resultData']['status_code'] = res.status_code\r
214         response_data['vthResponse']['resultData']['result_output'] = res.json()\r
215     except Exception as ex:\r
216         endTime = unix_time_millis(datetime.datetime.now())\r
217         totalTime = endTime - startTime\r
218         response_data['vthResponse']['testDuration'] = totalTime\r
219         response_data['vthResponse']['abstractMessage'] = 'error: ' + str(ex)\r
220         app.logger.error('ERROR:{}'.format(str(ex)))\r
221         return jsonify(response_data)\r
222 \r
223     endTime = unix_time_millis(datetime.datetime.now())\r
224     totalTime = endTime - startTime\r
225     response_data['vthResponse']['testDuration'] = totalTime\r
226     if ret_url is not None:\r
227         sendCallback(ret_url,response_data)\r
228         return '',200\r
229     return jsonify(response_data), 200\r
230 \r
231 if __name__ == '__main__':\r
232     logHandler = FileHandler('dmaap-vth.log', mode='a')\r
233     logHandler.setLevel(logging.INFO)\r
234     app.logger.setLevel(logging.INFO)\r
235     app.logger.addHandler(logHandler)\r
236     # context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem')\r
237     # app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context)\r
238     app.run(debug=False, host='0.0.0.0', port=5000)\r