From: rajdeep11 Date: Thu, 20 Apr 2023 05:27:28 +0000 (+0530) Subject: Changes for the Create Feature Group X-Git-Tag: 1.1.0~22 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=acc1a793eeb2aaeccafd821404437da234b6e2d3;p=aiml-fw%2Fawmf%2Ftm.git Changes for the Create Feature Group Issue-Id: AIMLFW-40 Change-Id: Ib0f17eaa01967d6f64f79e16a6e31030cf5bcd1d Signed-off-by: rajdeep11 --- diff --git a/trainingmgr/common/trainingmgr_util.py b/trainingmgr/common/trainingmgr_util.py index 9ff1ad9..f47ea3c 100644 --- a/trainingmgr/common/trainingmgr_util.py +++ b/trainingmgr/common/trainingmgr_util.py @@ -157,6 +157,34 @@ def check_trainingjob_data(trainingjob_name, json_data): arguments, query_filter, enable_versioning, pipeline_version, datalake_source, _measurement, bucket) +def check_featureGroup_data(json_data): + """ + This function checks validation for json_data dictionary and return tuple which conatins + values of different keys in jsn_data. + """ + try: + if check_key_in_dictionary(["featureGroupName", "feature_list", \ + "datalake_source", "enable_Dme", "DmeHost", + "DmePort", "bucket", "token", "source_name"], json_data): + featureGroup_name=json_data["featureGroupName"] + feature_list=json_data["feature_list"] + datalake_source=json_data["datalake_source"] + enable_Dme=json_data["enable_Dme"] + DmeHost=json_data["DmeHost"] + DmePort=json_data["DmePort"] + bucket=json_data["bucket"] + token=json_data["token"] + source_name=json_data["source_name"] + db_org=json_data["dbOrg"] + + else : + raise TMException("check_featuregroup_data- supplied data doesn't have" + \ + " all the required fields ") + + except Exception as err: + raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None + + return (featureGroup_name, feature_list, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name,db_org) def get_one_key(dictionary): ''' diff --git a/trainingmgr/db/common_db_fun.py b/trainingmgr/db/common_db_fun.py index 329138c..484def8 100644 --- a/trainingmgr/db/common_db_fun.py +++ b/trainingmgr/db/common_db_fun.py @@ -26,6 +26,7 @@ from trainingmgr.constants.states import States from trainingmgr.common.exceptions_utls import DBException tm_table_name = "trainingjob_info" # Table used by 'Training Manager' for training jobs +fg_table_name="featuregroup_info" # Table used by 'Training Manager' for Feature Groups DB_QUERY_EXEC_ERROR = "Failed to execute query in " def get_data_extraction_in_progress_trainingjobs(ps_db_obj): @@ -558,6 +559,31 @@ def add_update_trainingjob(description, pipeline_name, experiment_name, feature_ if conn is not None: conn.close() +def add_featuregroup(featureGroup_name, feature_list, datalake_source, enable_Dme, ps_db_obj , DmeHost="", DmePort="", bucket="", token="", source_name="",db_org=""): + """ + This function add the new row or update existing row with given information + """ + + conn = None + conn = ps_db_obj.get_new_conn() + cursor = conn.cursor() + + try: + cursor.execute(''' INSERT INTO {} VALUES '''.format(fg_table_name) + + '''(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''', + (featureGroup_name, feature_list, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name, db_org)) + conn.commit() + cursor.close() + except Exception as err: + if conn is not None: + conn.rollback() + raise DBException(DB_QUERY_EXEC_ERROR + + "add_featuregroup" + str(err)) + finally: + if conn is not None: + conn.close() + + def get_all_jobs_latest_status_version(ps_db_obj): """ This function returns True if given trainingjob_name exists in db otherwise diff --git a/trainingmgr/db/trainingmgr_ps_db.py b/trainingmgr/db/trainingmgr_ps_db.py index ed6ae58..5e3d2f6 100644 --- a/trainingmgr/db/trainingmgr_ps_db.py +++ b/trainingmgr/db/trainingmgr_ps_db.py @@ -111,6 +111,42 @@ class PSDB(): finally: if conn2 is not None: conn2.close() + + # Create Table + conn3 =None + try: + conn3 = pg8000.dbapi.connect(user=config_hdl.ps_user, + password=config_hdl.ps_password, + host=config_hdl.ps_ip, + port=int(config_hdl.ps_port), + database="training_manager_database") + except pg8000.dbapi.Error: + self.__config_hdl.logger.error("Problem of connection with postgres db") + raise DBException(PG_DB_ACCESS_ERROR) from None + cur3= conn3.cursor() + try: + cur3.execute("create table if not exists featuregroup_info(" + \ + "featureGroup_name varchar(128) NOT NULL," + \ + "feature_list varchar(2000) NOT NULL," + \ + "datalake_source varchar(2000) NOT NULL," + \ + "enable_dme BOOLEAN NOT NULL," + \ + "DmeHost varchar(128) NOT NULL," + \ + "DmePort varchar(128) NOT NULL," + \ + "bucket varchar(128) NOT NULL," + \ + "token varchar(2000) NOT NULL," + \ + "source_name varchar(2000) NOT NULL," + \ + "db_org varchar(128) NOT NULL," + \ + "PRIMARY KEY (featureGroup_name)" + \ + ")") + conn3.commit() + cur3.close() + except pg8000.dbapi.Error as err: + conn2.rollback() + self.__config_hdl.logger.error("Can't create featuregroup_info table.") + raise DBException("Can't create featuregroup_info table.", str(err)) from None + finally: + if conn3 is not None: + conn3.close() def get_new_conn(self): """ diff --git a/trainingmgr/trainingmgr_main.py b/trainingmgr/trainingmgr_main.py index d0b9fa1..213d3dd 100644 --- a/trainingmgr/trainingmgr_main.py +++ b/trainingmgr/trainingmgr_main.py @@ -33,13 +33,13 @@ import requests from flask_cors import cross_origin from werkzeug.utils import secure_filename from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk -from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status +from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status, create_dme_filtered_data_job from trainingmgr.common.trainingmgr_config import TrainingMgrConfig from trainingmgr.common.trainingmgr_util import get_one_word_status, check_trainingjob_data, \ check_key_in_dictionary, get_one_key, \ response_for_training, get_metrics, \ handle_async_feature_engineering_status_exception_case, \ - validate_trainingjob_name, get_all_pipeline_names_svc + validate_trainingjob_name, get_all_pipeline_names_svc, check_featureGroup_data from trainingmgr.common.exceptions_utls import APIException,TMException from trainingmgr.constants.steps import Steps from trainingmgr.constants.states import States @@ -49,7 +49,7 @@ from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainin change_in_progress_to_failed_by_latest_version, change_steps_state_of_latest_version, \ get_info_by_version, \ get_trainingjob_info_by_name, get_latest_version_trainingjob_name, get_all_versions_info_by_name, \ - update_model_download_url, add_update_trainingjob, \ + update_model_download_url, add_update_trainingjob, add_featuregroup, \ get_field_of_given_version,get_all_jobs_latest_status_version, get_info_of_latest_version APP = Flask(__name__) @@ -1134,6 +1134,41 @@ def get_metadata(trainingjob_name): status=response_code, mimetype=MIMETYPE_JSON) +@APP.route('/trainingjobs/featureGroup', methods=['POST']) +@cross_origin() +def create_feature_group(): + api_response = {} + response_code = status.HTTP_500_INTERNAL_SERVER_ERROR + LOGGER.debug('feature Group Create request, ' + json.dumps(request.json)) + + try: + json_data=request.json + (featureGroup_name, features, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name, db_org)=check_featureGroup_data(json_data) + # the features are stored in string format in the db, and has to be passed as list of feature to the dme. Hence the conversion. + features_list = features.split(", ") + add_featuregroup(featureGroup_name, features, datalake_source, enable_Dme, PS_DB_OBJ,DmeHost, DmePort, bucket, token, source_name,db_org ) + #todo : if the create_dme_job fails delete the feature group in the db. + if enable_Dme == True : + response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket, token, features_list, featureGroup_name, DmeHost, DmePort) + if response.status_code != 201: + api_response={"Exception": "Cannot create dme job"} + response_code=status.HTTP_400_BAD_REQUEST + else: + api_response={"result": "Feature Group Created"} + response_code =status.HTTP_200_OK + else: + api_response={"result": "Feature Group Created"} + response_code =status.HTTP_200_OK + except Exception as err: + err_msg = "Failed to create the feature Group" + api_response = {"Exception":err_msg} + LOGGER.error(str(err)) + + return APP.response_class(response=json.dumps(api_response), + status=response_code, + mimetype=MIMETYPE_JSON) + + def async_feature_engineering_status(): """ This function takes trainingjobs from DATAEXTRACTION_JOBS_CACHE and checks data extraction status