Changes for the Create Feature Group 58/10958/1
authorrajdeep11 <rajdeep.sin@samsung.com>
Thu, 20 Apr 2023 05:27:28 +0000 (10:57 +0530)
committerrajdeep11 <rajdeep.sin@samsung.com>
Thu, 20 Apr 2023 05:29:20 +0000 (10:59 +0530)
Issue-Id: AIMLFW-40

Change-Id: Ib0f17eaa01967d6f64f79e16a6e31030cf5bcd1d
Signed-off-by: rajdeep11 <rajdeep.sin@samsung.com>
trainingmgr/common/trainingmgr_util.py
trainingmgr/db/common_db_fun.py
trainingmgr/db/trainingmgr_ps_db.py
trainingmgr/trainingmgr_main.py

index 9ff1ad9..f47ea3c 100644 (file)
@@ -157,6 +157,34 @@ def check_trainingjob_data(trainingjob_name, json_data):
             arguments, query_filter, enable_versioning, pipeline_version,
             datalake_source, _measurement, bucket)
 
+def check_featureGroup_data(json_data):
+    """
+    This function checks validation for json_data dictionary and return tuple which conatins
+    values of different keys in jsn_data.
+    """
+    try:
+        if check_key_in_dictionary(["featureGroupName", "feature_list", \
+                                    "datalake_source", "enable_Dme", "DmeHost", 
+                                    "DmePort", "bucket", "token", "source_name"], json_data):
+            featureGroup_name=json_data["featureGroupName"]
+            feature_list=json_data["feature_list"]
+            datalake_source=json_data["datalake_source"]
+            enable_Dme=json_data["enable_Dme"]
+            DmeHost=json_data["DmeHost"]
+            DmePort=json_data["DmePort"]
+            bucket=json_data["bucket"]
+            token=json_data["token"]
+            source_name=json_data["source_name"]
+            db_org=json_data["dbOrg"]
+            
+        else :
+            raise TMException("check_featuregroup_data- supplied data doesn't have" + \
+                                " all the required fields ")
+    
+    except Exception as err:
+        raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
+    
+    return (featureGroup_name, feature_list, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name,db_org)
 
 def get_one_key(dictionary):
     '''
index 329138c..484def8 100644 (file)
@@ -26,6 +26,7 @@ from trainingmgr.constants.states import States
 from trainingmgr.common.exceptions_utls import DBException
 
 tm_table_name = "trainingjob_info" # Table used by 'Training Manager' for training jobs
+fg_table_name="featuregroup_info"  # Table used by 'Training Manager' for Feature Groups
 DB_QUERY_EXEC_ERROR = "Failed to execute query in "
 
 def get_data_extraction_in_progress_trainingjobs(ps_db_obj):
@@ -558,6 +559,31 @@ def add_update_trainingjob(description, pipeline_name, experiment_name, feature_
          if conn is not None:
                  conn.close()
 
+def add_featuregroup(featureGroup_name, feature_list, datalake_source, enable_Dme, ps_db_obj , DmeHost="", DmePort="", bucket="", token="", source_name="",db_org=""):
+    """
+        This function add the new row or update existing row with given information
+    """
+
+    conn = None
+    conn = ps_db_obj.get_new_conn()
+    cursor = conn.cursor()
+    
+    try:
+        cursor.execute(''' INSERT INTO {} VALUES '''.format(fg_table_name) +
+                       '''(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
+                       (featureGroup_name, feature_list, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name, db_org))
+        conn.commit()
+        cursor.close()
+    except Exception as err:
+        if conn is not None:
+            conn.rollback()
+        raise DBException(DB_QUERY_EXEC_ERROR +
+                          "add_featuregroup" + str(err))
+    finally:
+        if conn is not None:
+            conn.close()
+
+     
 def get_all_jobs_latest_status_version(ps_db_obj):
     """
     This function returns True if given trainingjob_name exists in db otherwise
index ed6ae58..5e3d2f6 100644 (file)
@@ -111,6 +111,42 @@ class PSDB():
         finally:
             if conn2 is not None:
                 conn2.close()
+        
+        # Create Table
+        conn3 =None
+        try:
+            conn3 = pg8000.dbapi.connect(user=config_hdl.ps_user,
+                                        password=config_hdl.ps_password,
+                                        host=config_hdl.ps_ip,
+                                        port=int(config_hdl.ps_port),
+                                        database="training_manager_database")
+        except pg8000.dbapi.Error:
+            self.__config_hdl.logger.error("Problem of connection with postgres db")
+            raise DBException(PG_DB_ACCESS_ERROR) from None
+        cur3= conn3.cursor()
+        try:
+            cur3.execute("create table if not exists featuregroup_info(" + \
+                        "featureGroup_name varchar(128) NOT NULL," + \
+                        "feature_list varchar(2000) NOT NULL," + \
+                        "datalake_source varchar(2000) NOT NULL," + \
+                        "enable_dme BOOLEAN NOT NULL," + \
+                        "DmeHost varchar(128) NOT NULL," + \
+                        "DmePort varchar(128) NOT NULL," + \
+                        "bucket varchar(128) NOT NULL," + \
+                        "token varchar(2000) NOT NULL," + \
+                        "source_name varchar(2000) NOT NULL," + \
+                        "db_org varchar(128) NOT NULL," + \
+                        "PRIMARY KEY (featureGroup_name)" + \
+                        ")")
+            conn3.commit()
+            cur3.close()
+        except pg8000.dbapi.Error as err:
+            conn2.rollback()
+            self.__config_hdl.logger.error("Can't create featuregroup_info table.")
+            raise DBException("Can't create featuregroup_info table.", str(err)) from None
+        finally:
+            if conn3 is not None:
+                conn3.close()
 
     def get_new_conn(self):
         """
index d0b9fa1..213d3dd 100644 (file)
@@ -33,13 +33,13 @@ import requests
 from flask_cors import cross_origin
 from werkzeug.utils import secure_filename
 from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
-from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status
+from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status, create_dme_filtered_data_job
 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
 from trainingmgr.common.trainingmgr_util import get_one_word_status, check_trainingjob_data, \
     check_key_in_dictionary, get_one_key, \
     response_for_training, get_metrics, \
     handle_async_feature_engineering_status_exception_case, \
-    validate_trainingjob_name, get_all_pipeline_names_svc
+    validate_trainingjob_name, get_all_pipeline_names_svc, check_featureGroup_data
 from trainingmgr.common.exceptions_utls import APIException,TMException
 from trainingmgr.constants.steps import Steps
 from trainingmgr.constants.states import States
@@ -49,7 +49,7 @@ from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainin
     change_in_progress_to_failed_by_latest_version, change_steps_state_of_latest_version, \
     get_info_by_version, \
     get_trainingjob_info_by_name, get_latest_version_trainingjob_name, get_all_versions_info_by_name, \
-    update_model_download_url, add_update_trainingjob, \
+    update_model_download_url, add_update_trainingjob, add_featuregroup, \
     get_field_of_given_version,get_all_jobs_latest_status_version, get_info_of_latest_version
 
 APP = Flask(__name__)
@@ -1134,6 +1134,41 @@ def get_metadata(trainingjob_name):
                                         status=response_code,
                                         mimetype=MIMETYPE_JSON)
 
+@APP.route('/trainingjobs/featureGroup', methods=['POST'])
+@cross_origin()
+def create_feature_group():
+    api_response = {}
+    response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
+    LOGGER.debug('feature Group Create request, ' + json.dumps(request.json))
+
+    try:
+        json_data=request.json
+        (featureGroup_name, features, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name, db_org)=check_featureGroup_data(json_data)
+        # the features are stored in string format in the db, and has to be passed as list of feature to the dme. Hence the conversion.
+        features_list = features.split(", ")
+        add_featuregroup(featureGroup_name, features, datalake_source, enable_Dme, PS_DB_OBJ,DmeHost, DmePort, bucket, token, source_name,db_org )
+        #todo :  if the create_dme_job fails delete the feature group in the db.
+        if enable_Dme == True :
+            response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket,  token, features_list, featureGroup_name,  DmeHost, DmePort)
+            if response.status_code != 201:
+                api_response={"Exception": "Cannot create dme job"}
+                response_code=status.HTTP_400_BAD_REQUEST
+            else:
+                api_response={"result": "Feature Group Created"}
+                response_code =status.HTTP_200_OK
+        else:
+            api_response={"result": "Feature Group Created"}
+            response_code =status.HTTP_200_OK    
+    except Exception as err:
+        err_msg = "Failed to create the feature Group"
+        api_response = {"Exception":err_msg}
+        LOGGER.error(str(err))
+    
+    return APP.response_class(response=json.dumps(api_response),
+                                        status=response_code,
+                                        mimetype=MIMETYPE_JSON)
+
+
 def async_feature_engineering_status():
     """
     This function takes trainingjobs from DATAEXTRACTION_JOBS_CACHE and checks data extraction status