arguments, query_filter, enable_versioning, pipeline_version,
datalake_source, _measurement, bucket)
+def check_featureGroup_data(json_data):
+ """
+ This function checks validation for json_data dictionary and return tuple which conatins
+ values of different keys in jsn_data.
+ """
+ try:
+ if check_key_in_dictionary(["featureGroupName", "feature_list", \
+ "datalake_source", "enable_Dme", "DmeHost",
+ "DmePort", "bucket", "token", "source_name"], json_data):
+ featureGroup_name=json_data["featureGroupName"]
+ feature_list=json_data["feature_list"]
+ datalake_source=json_data["datalake_source"]
+ enable_Dme=json_data["enable_Dme"]
+ DmeHost=json_data["DmeHost"]
+ DmePort=json_data["DmePort"]
+ bucket=json_data["bucket"]
+ token=json_data["token"]
+ source_name=json_data["source_name"]
+ db_org=json_data["dbOrg"]
+
+ else :
+ raise TMException("check_featuregroup_data- supplied data doesn't have" + \
+ " all the required fields ")
+
+ except Exception as err:
+ raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
+
+ return (featureGroup_name, feature_list, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name,db_org)
def get_one_key(dictionary):
'''
from trainingmgr.common.exceptions_utls import DBException
tm_table_name = "trainingjob_info" # Table used by 'Training Manager' for training jobs
+fg_table_name="featuregroup_info" # Table used by 'Training Manager' for Feature Groups
DB_QUERY_EXEC_ERROR = "Failed to execute query in "
def get_data_extraction_in_progress_trainingjobs(ps_db_obj):
if conn is not None:
conn.close()
+def add_featuregroup(featureGroup_name, feature_list, datalake_source, enable_Dme, ps_db_obj , DmeHost="", DmePort="", bucket="", token="", source_name="",db_org=""):
+ """
+ This function add the new row or update existing row with given information
+ """
+
+ conn = None
+ conn = ps_db_obj.get_new_conn()
+ cursor = conn.cursor()
+
+ try:
+ cursor.execute(''' INSERT INTO {} VALUES '''.format(fg_table_name) +
+ '''(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
+ (featureGroup_name, feature_list, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name, db_org))
+ conn.commit()
+ cursor.close()
+ except Exception as err:
+ if conn is not None:
+ conn.rollback()
+ raise DBException(DB_QUERY_EXEC_ERROR +
+ "add_featuregroup" + str(err))
+ finally:
+ if conn is not None:
+ conn.close()
+
+
def get_all_jobs_latest_status_version(ps_db_obj):
"""
This function returns True if given trainingjob_name exists in db otherwise
finally:
if conn2 is not None:
conn2.close()
+
+ # Create Table
+ conn3 =None
+ try:
+ conn3 = pg8000.dbapi.connect(user=config_hdl.ps_user,
+ password=config_hdl.ps_password,
+ host=config_hdl.ps_ip,
+ port=int(config_hdl.ps_port),
+ database="training_manager_database")
+ except pg8000.dbapi.Error:
+ self.__config_hdl.logger.error("Problem of connection with postgres db")
+ raise DBException(PG_DB_ACCESS_ERROR) from None
+ cur3= conn3.cursor()
+ try:
+ cur3.execute("create table if not exists featuregroup_info(" + \
+ "featureGroup_name varchar(128) NOT NULL," + \
+ "feature_list varchar(2000) NOT NULL," + \
+ "datalake_source varchar(2000) NOT NULL," + \
+ "enable_dme BOOLEAN NOT NULL," + \
+ "DmeHost varchar(128) NOT NULL," + \
+ "DmePort varchar(128) NOT NULL," + \
+ "bucket varchar(128) NOT NULL," + \
+ "token varchar(2000) NOT NULL," + \
+ "source_name varchar(2000) NOT NULL," + \
+ "db_org varchar(128) NOT NULL," + \
+ "PRIMARY KEY (featureGroup_name)" + \
+ ")")
+ conn3.commit()
+ cur3.close()
+ except pg8000.dbapi.Error as err:
+ conn2.rollback()
+ self.__config_hdl.logger.error("Can't create featuregroup_info table.")
+ raise DBException("Can't create featuregroup_info table.", str(err)) from None
+ finally:
+ if conn3 is not None:
+ conn3.close()
def get_new_conn(self):
"""
from flask_cors import cross_origin
from werkzeug.utils import secure_filename
from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
-from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status
+from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status, create_dme_filtered_data_job
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
from trainingmgr.common.trainingmgr_util import get_one_word_status, check_trainingjob_data, \
check_key_in_dictionary, get_one_key, \
response_for_training, get_metrics, \
handle_async_feature_engineering_status_exception_case, \
- validate_trainingjob_name, get_all_pipeline_names_svc
+ validate_trainingjob_name, get_all_pipeline_names_svc, check_featureGroup_data
from trainingmgr.common.exceptions_utls import APIException,TMException
from trainingmgr.constants.steps import Steps
from trainingmgr.constants.states import States
change_in_progress_to_failed_by_latest_version, change_steps_state_of_latest_version, \
get_info_by_version, \
get_trainingjob_info_by_name, get_latest_version_trainingjob_name, get_all_versions_info_by_name, \
- update_model_download_url, add_update_trainingjob, \
+ update_model_download_url, add_update_trainingjob, add_featuregroup, \
get_field_of_given_version,get_all_jobs_latest_status_version, get_info_of_latest_version
APP = Flask(__name__)
status=response_code,
mimetype=MIMETYPE_JSON)
+@APP.route('/trainingjobs/featureGroup', methods=['POST'])
+@cross_origin()
+def create_feature_group():
+ api_response = {}
+ response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ LOGGER.debug('feature Group Create request, ' + json.dumps(request.json))
+
+ try:
+ json_data=request.json
+ (featureGroup_name, features, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name, db_org)=check_featureGroup_data(json_data)
+ # the features are stored in string format in the db, and has to be passed as list of feature to the dme. Hence the conversion.
+ features_list = features.split(", ")
+ add_featuregroup(featureGroup_name, features, datalake_source, enable_Dme, PS_DB_OBJ,DmeHost, DmePort, bucket, token, source_name,db_org )
+ #todo : if the create_dme_job fails delete the feature group in the db.
+ if enable_Dme == True :
+ response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket, token, features_list, featureGroup_name, DmeHost, DmePort)
+ if response.status_code != 201:
+ api_response={"Exception": "Cannot create dme job"}
+ response_code=status.HTTP_400_BAD_REQUEST
+ else:
+ api_response={"result": "Feature Group Created"}
+ response_code =status.HTTP_200_OK
+ else:
+ api_response={"result": "Feature Group Created"}
+ response_code =status.HTTP_200_OK
+ except Exception as err:
+ err_msg = "Failed to create the feature Group"
+ api_response = {"Exception":err_msg}
+ LOGGER.error(str(err))
+
+ return APP.response_class(response=json.dumps(api_response),
+ status=response_code,
+ mimetype=MIMETYPE_JSON)
+
+
def async_feature_engineering_status():
"""
This function takes trainingjobs from DATAEXTRACTION_JOBS_CACHE and checks data extraction status