From: josephthaliath Date: Tue, 18 Oct 2022 10:33:15 +0000 (+0530) Subject: Main file for data extraction X-Git-Tag: 1.0.0~12 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=d1643476e22736b20a842d3aecc76de96e386fe4;p=aiml-fw%2Fathp%2Fdata-extraction.git Main file for data extraction Issue-Id: AIMLFW-2 Signed-off-by: josephthaliath Change-Id: Idef89915da682e4a5393f165ea00301c11fe9e1c Signed-off-by: josephthaliath --- diff --git a/dataextraction/main.py b/dataextraction/main.py new file mode 100644 index 0000000..6b256fa --- /dev/null +++ b/dataextraction/main.py @@ -0,0 +1,191 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +Module Rest interface to access +Feature Engineering Pipeline +""" +import traceback +import datetime +import json +import threading +import queue + + +import jsonpickle +from flask import Flask +from flask_restful import request +from flask_api import status +from SparkHelper import sparkSessionManager +from ConfigHelper import ConfigHelper +from FeatureEngineeringFactory import FeatureEngineeringFactory + +fsConf = ConfigHelper() +logger = fsConf.getLogger() +session_helper = sparkSessionManager() +factory = FeatureEngineeringFactory(session_helper) +tasks = queue.Queue() +task_map = {} + +class task(): + """ + Task Class + """ + def __init__(self, task, task_status ): + """ + TaskName + """ + self.task = task + self.status = task_status + self.task_error = None +app = Flask(__name__) +@app.route('/feature-groups', methods=['POST']) +def post_handle(): + """ + Creates and Executes a DataPipeline + + Args: + source (dictionary) : This is the Data Source for + the Data Pipeline e.g. InfluxDB + transformer (dictionary) : This is the Transformation that + is required for feature modification e.g. SQL,VectorAssembler + sink (dictionary): Specific feature filter values for each training job is defined + Return: json dict: denoting the result of data extraction task + status: HTTP status 200 or 500 + Raises: + Exception: + If supplied "trainingjob_name",list of features are empty, + If one of more suplied feature does not exist in data lake + If Data lake or feature store connection is down + """ + start_time = datetime.datetime.now() + logger.debug(str(start_time) +" Call Started") + request_json = request.get_json(force = True) + logger.debug("Got json list: "+str( request_json)) + response_code = status.HTTP_200_OK + try: + task_id = str(request_json["sink"]["CassandraSink"]["CollectionName"]) + api_result_msg = "/task-status/"+task_id + logger.debug("Generated ID"+task_id) + tasks.put(task_id) + task_map[task_id] = task(request_json ,"Accepted") + logger.debug("Generated ID"+task_id) + except Exception as exc: + api_result_msg = str(exc) + response_code = status.HTTP_500_INTERNAL_SERVER_ERROR + logger.error('ERROR in data extration'+str(api_result_msg)) + logger.error(str(traceback.format_exc())) + + response = app.response_class(response=json.\ + dumps(\ + { "trainingjob_name":request_json["sink"]["CassandraSink"]["CollectionName"],\ + "result" : api_result_msg }),\ + status= response_code,mimetype='application/json') + end_time = datetime.datetime.now() + logger.info(str(end_time-start_time)+' API call finished') + return response +@app.route('/task-status/', methods=['GET']) +def get_task_status(task_id): + """ + Return Task Status + """ + try: + taskstatus = task_map[task_id].status + response_code = status.HTTP_200_OK + api_result_msg = "Data Pipeline Execution "+taskstatus + if taskstatus == "Error": + response_code = status.HTTP_500_INTERNAL_SERVER_ERROR + api_result_msg = task_map[task_id].error + except Exception as exc: + response_code = status.HTTP_500_INTERNAL_SERVER_ERROR + api_result_msg = str(exc) + taskstatus = "Error" + + response = app.response_class(response=json.dumps( + { "task_status":taskstatus,"result" : api_result_msg }), + status= response_code,mimetype='application/json') + return response +@app.route('/task-statuses', methods=['GET']) +def get_task_statuses(): + """ + Return Task Status + """ + try: + response_code = status.HTTP_200_OK + response = jsonpickle.encode(task_map) + except Exception as exc: + response_code = status.HTTP_500_INTERNAL_SERVER_ERROR + response = str(exc) + + response = app.response_class(response, + status= response_code,mimetype='application/json') + return response +@app.route('/delete-task-status/', methods=['DELETE']) +def delete_task_status(task_id): + """ + delete Task Status + """ + response_code = status.HTTP_200_OK + try: + api_result_msg = jsonpickle.encode(task_map[task_id]) + task_map.pop(task_id) + #pylint diable=W0703 + except Exception as exc: + response_code = status.HTTP_500_INTERNAL_SERVER_ERROR + api_result_msg = str(exc) + response = app.response_class(response=json.dumps({ "trainingjob_name": task_id,"result" : api_result_msg }), status= response_code,mimetype='application/json') + return response + +def async_code_worker(): + """ + AsyncCode Worker + Infinte loop which will will retrive and process tasks assigned for executing data extraction + """ + while True: + try: + start_time = datetime.datetime.now() + logger.debug(str(start_time) +"Feature Engineering Pipeline Started") + task_id = tasks.get() + request_json = task_map[task_id].task + task_map[task_id].status = "In Progress" + source_dict = request_json["source"] + transform_dict = request_json["transform"] + sink_dict = request_json["sink"] + c_key = str(source_dict)+str(transform_dict)+str(sink_dict) + logger.debug(c_key) + feature_engineering_pipeline = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, c_key) + session = session_helper.getSession() + feature_engineering_pipeline.loadData(session) + feature_engineering_pipeline.transformData(session) + feature_engineering_pipeline.writeData(session) + session_helper.stop() + task_map[task_id].status = "Completed" + tasks.task_done() + end_time = datetime.datetime.now() + logger.debug(str(end_time) +"Feature Engineering Pipline Ended") + except Exception as exc: + session_helper.stop() + traceback.print_exc() + logger.error('ERROR in processing task id:'+task_id+" Error:"+str(exc)) + api_result_msg = str(exc) + task_map[task_id].status = "Error" + task_map[task_id].error = api_result_msg +if __name__ == "__main__": + print("******Initiaizing feature store API ******" ) + threading.Thread(target=async_code_worker, daemon=True).start() + app.run(host=fsConf.getFsHost(), port = fsConf.getFsPort(), debug=True)