1 # Copyright (c) 2019 AT&T Intellectual Property. #
\r
3 # Licensed under the Apache License, Version 2.0 (the "License"); #
\r
4 # you may not use this file except in compliance with the License. #
\r
5 # You may obtain a copy of the License at #
\r
7 # http://www.apache.org/licenses/LICENSE-2.0 #
\r
9 # Unless required by applicable law or agreed to in writing, software #
\r
10 # distributed under the License is distributed on an "AS IS" BASIS, #
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
\r
12 # See the License for the specific language governing permissions and #
\r
13 # limitations under the License. #
\r
14 ################################################################################
\r
15 # File name: dmaap-vth.py #
\r
16 # Description: vth that utilize dmaap to subscribe and publish to topics #
\r
17 # Date created: 02/21/2020 #
\r
18 # Last modified: 04/02/2020 #
\r
19 # Python Version: 3.7 #
\r
20 # Author: Jackie Chen (jv246a) #
\r
21 # Email: jv246a@att.com #
\r
22 ################################################################################
\r
25 from configparser import ConfigParser
\r
28 from logging import FileHandler
\r
30 from flask import Flask, request, jsonify
\r
32 # redirect http to https
\r
33 app = Flask(__name__)
\r
35 # Prevents print statement every time an endpoint is triggered.
\r
36 logging.getLogger("werkzeug").setLevel(logging.WARNING)
\r
39 def sendCallback(url, data):
\r
41 if type(data) is not dict:
\r
42 data = {"msg": data}
\r
43 app.logger.info("sending callback")
\r
44 requests.post(url, json= data)
\r
45 except Exception as e:
\r
49 def unix_time_millis(dt):
\r
50 epoch = datetime.datetime.utcfromtimestamp(0)
\r
51 return (dt - epoch).total_seconds() * 1000.0
\r
54 def _get_request_data():
\r
55 if not request.is_json:
\r
56 raise ValueError("request must be json")
\r
57 requestData = request.get_json()
\r
61 def _get_config(config_file_name):
\r
62 config = ConfigParser(os.environ)
\r
63 config.read(config_file_name)
\r
66 def _validate_request(request_data, isPublish=True):
\r
69 if 'topic_name' not in request_data:
\r
70 missing_params.append("topic_name")
\r
72 if 'data' not in request_data:
\r
73 missing_params.append('data')
\r
75 if 'consumer_group' not in request_data:
\r
76 missing_params.append('consumer_group')
\r
77 if 'consumer_id' not in request_data:
\r
78 missing_params.append('consumer_id')
\r
81 err_msg = '{} request requires the following: '.format('publish' if isPublish else 'subscribe')
\r
82 err_msg += ','.join(missing_params)
\r
83 raise KeyError(err_msg)
\r
86 def _build_url(config, request_data, is_publish=True):
\r
88 base_path = config['resource']['base_address'] + config['resource']['publish']
\r
89 topic_name = request_data['topic_name']
\r
90 publish_address = base_path.format(topic_name=topic_name)
\r
91 return publish_address
\r
93 base_path = config['resource']['base_address'] + config['resource']['subscribe']
\r
94 topic_name = request_data['topic_name']
\r
95 consumer_group = request_data['consumer_group']
\r
96 consumer_id = request_data['consumer_id']
\r
97 subscribe_address = base_path.format(topic_name=topic_name, consumer_group=consumer_group, consumer_id=consumer_id)
\r
98 if ('timeout' in request_data):
\r
99 subscribe_address = (subscribe_address + '?timeout={}').format(request_data['timeout'])
\r
100 return subscribe_address
\r
103 def _send_request(url, config, is_subscribe_request=False, payload=None):
\r
104 # setup default values
\r
105 auth_enabled = config.getboolean('auth', 'auth_enabled')
\r
106 proxy_enabled = config.getboolean('resource', 'proxy_enabled')
\r
113 # place proxy and authentication information
\r
115 username = config['auth']['username']
\r
116 password = config['auth']['password']
\r
118 req_proxies['http'] = config['resource']['http_proxy']
\r
119 req_proxies['https'] = config['resource']['https_proxy']
\r
121 # for subscribe request
\r
122 if is_subscribe_request:
\r
123 return requests.get(url,
\r
124 auth=(username, password) if auth_enabled else None,
\r
125 proxies=req_proxies)
\r
126 # for publish request
\r
127 req_headers = {'Content-type': 'application/json'}
\r
128 return requests.post(url,
\r
130 auth=(username, password) if auth_enabled else None,
\r
131 proxies=req_proxies,
\r
132 headers=req_headers)
\r
134 @app.route("/otf/vth/oran/dmaap/v1/health", methods=['GET'])
\r
138 @app.route("/otf/vth/oran/dmaap/v1/subscribe", methods=["POST"])
\r
139 def subscribeRequest():
\r
142 'testDuration': '',
\r
143 'dateTimeUTC': str(datetime.datetime.now()),
\r
144 'abstractMessage': '',
\r
148 ret_url = request.args.get('retURL')
\r
149 startTime = unix_time_millis(datetime.datetime.now())
\r
152 request_data = _get_request_data()
\r
153 _validate_request(request_data, isPublish=False)
\r
154 app.logger.info("incoming subscribe request w/ the following payload:" + str(request_data))
\r
157 config = _get_config('config.ini')
\r
158 subscribe_address = _build_url(config, request_data, is_publish=False)
\r
161 app.logger.info('Sending GET to subscribe')
\r
162 res = _send_request(subscribe_address, config, is_subscribe_request=True)
\r
163 app.logger.info('Response received from subscribe: {}'.format(res.json()))
\r
164 response_data['vthResponse']['abstractMessage'] = 'Result from subscribe request'
\r
165 response_data['vthResponse']['resultData']['status_code'] = res.status_code
\r
166 response_data['vthResponse']['resultData']['result_output'] = res.json()
\r
167 except Exception as ex:
\r
168 endTime = unix_time_millis(datetime.datetime.now())
\r
169 totalTime = endTime - startTime
\r
170 response_data['vthResponse']['testDuration'] = totalTime
\r
171 response_data['vthResponse']['abstractMessage'] = 'error: ' + str(ex)
\r
172 app.logger.error('ERROR:{}'.format(str(ex)))
\r
173 return jsonify(response_data)
\r
175 endTime = unix_time_millis(datetime.datetime.now())
\r
176 totalTime = endTime - startTime
\r
177 response_data['vthResponse']['testDuration'] = totalTime
\r
178 if ret_url is not None:
\r
179 sendCallback(ret_url,response_data)
\r
181 return jsonify(response_data), 200
\r
184 @app.route("/otf/vth/oran/dmaap/v1/publish", methods=['POST'])
\r
185 def publishRequest():
\r
188 'testDuration': '',
\r
189 'dateTimeUTC': str(datetime.datetime.now()),
\r
190 'abstractMessage': '',
\r
194 startTime = unix_time_millis(datetime.datetime.now())
\r
195 ret_url = request.args.get('retURL')
\r
199 request_data = _get_request_data()
\r
200 _validate_request(request_data)
\r
201 app.logger.info("incoming publish request w/ the following payload:" + str(request_data))
\r
204 config = _get_config('config.ini')
\r
205 payload = request_data['data']
\r
206 publish_address = _build_url(config, request_data)
\r
209 app.logger.info("Sending POST to publish")
\r
210 res = _send_request(url=publish_address, config=config, payload=payload)
\r
211 app.logger.info("Response received from publish: {}".format(res.json()))
\r
212 response_data['vthResponse']['abstractMessage'] = 'Result from publish request'
\r
213 response_data['vthResponse']['resultData']['status_code'] = res.status_code
\r
214 response_data['vthResponse']['resultData']['result_output'] = res.json()
\r
215 except Exception as ex:
\r
216 endTime = unix_time_millis(datetime.datetime.now())
\r
217 totalTime = endTime - startTime
\r
218 response_data['vthResponse']['testDuration'] = totalTime
\r
219 response_data['vthResponse']['abstractMessage'] = 'error: ' + str(ex)
\r
220 app.logger.error('ERROR:{}'.format(str(ex)))
\r
221 return jsonify(response_data)
\r
223 endTime = unix_time_millis(datetime.datetime.now())
\r
224 totalTime = endTime - startTime
\r
225 response_data['vthResponse']['testDuration'] = totalTime
\r
226 if ret_url is not None:
\r
227 sendCallback(ret_url,response_data)
\r
229 return jsonify(response_data), 200
\r
231 if __name__ == '__main__':
\r
232 logHandler = FileHandler('dmaap-vth.log', mode='a')
\r
233 logHandler.setLevel(logging.INFO)
\r
234 app.logger.setLevel(logging.INFO)
\r
235 app.logger.addHandler(logHandler)
\r
236 # context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem')
\r
237 # app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context)
\r
238 app.run(debug=False, host='0.0.0.0', port=5000)
\r