adding apis for list feature group 75/10975/6
authorrajdeep11 <rajdeep.sin@samsung.com>
Tue, 25 Apr 2023 11:53:26 +0000 (17:23 +0530)
committerrajdeep11 <rajdeep.sin@samsung.com>
Wed, 26 Apr 2023 09:56:02 +0000 (15:26 +0530)
Issue-Id: AIMLFW-40

Change-Id: I9c6cbc1e5449b31aef44902bd74edacad8903e33
Signed-off-by: rajdeep11 <rajdeep.sin@samsung.com>
tests/test_tm_apis.py
trainingmgr/common/trainingmgr_util.py
trainingmgr/db/common_db_fun.py
trainingmgr/trainingmgr_main.py

index 39127af..aeca8bc 100644 (file)
@@ -117,11 +117,11 @@ class Test_trainingjobs_operations:
     @patch('trainingmgr.trainingmgr_main.get_one_word_status', return_value = "status OK")
     def test_trainingjobs_operations_get_exception(self,mock1,mock2):
         trainingmgr_main.LOGGER.debug("******* test_trainingjobs_operations get exception*******")
-        expected_data = "Failed to fetch training job info from db"
+        expected_data =  b'{"trainingjobs": []}'
         response = self.client.get("/trainingjobs/latest",content_type="application/json")
         trainingmgr_main.LOGGER.debug(response.data)    
-        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal"
-        assert expected_data in str(response.data)
+        assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+        assert expected_data in response.data
 
 class Test_pipeline_notification:
     def setup_method(self):
index f47ea3c..4a40564 100644 (file)
@@ -167,11 +167,11 @@ def check_featureGroup_data(json_data):
                                     "datalake_source", "enable_Dme", "DmeHost", 
                                     "DmePort", "bucket", "token", "source_name"], json_data):
             featureGroup_name=json_data["featureGroupName"]
-            feature_list=json_data["feature_list"]
+            features=json_data["feature_list"]
             datalake_source=json_data["datalake_source"]
             enable_Dme=json_data["enable_Dme"]
-            DmeHost=json_data["DmeHost"]
-            DmePort=json_data["DmePort"]
+            dme_host=json_data["DmeHost"]
+            dme_port=json_data["DmePort"]
             bucket=json_data["bucket"]
             token=json_data["token"]
             source_name=json_data["source_name"]
@@ -184,7 +184,7 @@ def check_featureGroup_data(json_data):
     except Exception as err:
         raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
     
-    return (featureGroup_name, feature_list, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name,db_org)
+    return (featureGroup_name, features, datalake_source, enable_Dme, dme_host, dme_port, bucket, token, source_name,db_org)
 
 def get_one_key(dictionary):
     '''
index 46de3e4..1af5184 100644 (file)
@@ -588,7 +588,7 @@ def get_feature_groups_db(ps_db_obj):
     This function returns feature_groups
     """
     conn=None
-    result=None
+    result=()
     try:
         conn=ps_db_obj.get_new_conn()
         cursor=conn.cursor()
@@ -630,6 +630,26 @@ def get_feature_group_by_name_db(ps_db_obj, featuregroup_name):
         if conn is not None:
             conn.close()
     return result 
+
+def delete_feature_group_by_name(ps_db_obj, featuregroup_name):
+    """
+    This function is used to delete the feature group from db
+    """
+    conn=None
+    try:
+        conn=ps_db_obj.get_new_conn()
+        cursor=conn.cursor()
+        cursor.execute('''delete from {} where featuregroup_name =%s'''.format(fg_table_name),(featuregroup_name, ))
+        conn.commit()
+        cursor.close()
+    except Exception as err:
+        if conn is not None:
+            conn.rollback()
+        raise DBException(DB_QUERY_EXEC_ERROR + \
+            "delete_feature_group"  + str(err))
+    finally:
+        if conn is not None:
+                conn.close()
      
 def get_all_jobs_latest_status_version(ps_db_obj):
     """
@@ -637,7 +657,7 @@ def get_all_jobs_latest_status_version(ps_db_obj):
     it returns False.
     """
     conn = None
-    results = None
+    results = ()
     try:
         conn = ps_db_obj.get_new_conn()
         cursor = conn.cursor()
index 213d3dd..a04f1b6 100644 (file)
@@ -50,7 +50,8 @@ from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainin
     get_info_by_version, \
     get_trainingjob_info_by_name, get_latest_version_trainingjob_name, get_all_versions_info_by_name, \
     update_model_download_url, add_update_trainingjob, add_featuregroup, \
-    get_field_of_given_version,get_all_jobs_latest_status_version, get_info_of_latest_version
+    get_field_of_given_version,get_all_jobs_latest_status_version, get_info_of_latest_version, \
+    get_feature_groups_db, get_feature_group_by_name_db, delete_feature_group_by_name
 
 APP = Flask(__name__)
 TRAININGMGR_CONFIG_OBJ = None
@@ -566,18 +567,15 @@ def trainingjobs_operations():
     try:
         results = get_all_jobs_latest_status_version(PS_DB_OBJ)
         trainingjobs = []
-        if results:
-            for res in results:
-                dict_data = {
-                    "trainingjob_name": res[0],
-                    "version": res[1],
-                    "overall_status": get_one_word_status(json.loads(res[2]))
-                }
-                trainingjobs.append(dict_data)
-            api_response = {"trainingjobs": trainingjobs}
-            response_code = status.HTTP_200_OK
-        else :
-            raise TMException("Failed to fetch training job info from db")
+        for res in results:
+            dict_data = {
+                "trainingjob_name": res[0],
+                "version": res[1],
+                "overall_status": get_one_word_status(json.loads(res[2]))
+            }
+            trainingjobs.append(dict_data)
+        api_response = {"trainingjobs": trainingjobs}
+        response_code = status.HTTP_200_OK
     except Exception as err:
         api_response =   {"Exception": str(err)}
         LOGGER.error(str(err))
@@ -1134,24 +1132,70 @@ def get_metadata(trainingjob_name):
                                         status=response_code,
                                         mimetype=MIMETYPE_JSON)
 
-@APP.route('/trainingjobs/featureGroup', methods=['POST'])
+@APP.route('/featureGroup', methods=['POST'])
 @cross_origin()
 def create_feature_group():
+    """
+    Rest endpoint to create feature group
+
+    Args in function:
+                NONE
+
+    Args in json:
+            json with below fields are given:
+                featureGroupName: str
+                    description
+                feature_list: str
+                    feature names
+                enable_Dme: boolean
+                    whether to enable dme
+                source_name: str
+                    name of source
+                dbOrg: str
+                    name of db org
+                bucket: str
+                    bucket name
+                DmePort: str
+                    DME port
+                DmeHost: str
+                    DME Host
+                datalake_source: str
+                    string indicating datalake source
+                token: str
+                    token for the bucket
+
+    Returns:
+        1. For post request
+            json:
+                result : str
+                    result message
+                status code:
+                    HTTP status code 201
+        2. For put request
+            json:
+                result : str
+                    result message
+                status code:
+                    HTTP status code 200
+
+    Exceptions:
+        All exception are provided with exception message and HTTP status code."""
+    
     api_response = {}
     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
     LOGGER.debug('feature Group Create request, ' + json.dumps(request.json))
 
     try:
         json_data=request.json
-        (featureGroup_name, features, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name, db_org)=check_featureGroup_data(json_data)
+        (featureGroup_name, features, datalake_source, enable_Dme, dme_host, dme_port, bucket, token, source_name,db_org)=check_featureGroup_data(json_data)
         # the features are stored in string format in the db, and has to be passed as list of feature to the dme. Hence the conversion.
         features_list = features.split(", ")
-        add_featuregroup(featureGroup_name, features, datalake_source, enable_Dme, PS_DB_OBJ,DmeHost, DmePort, bucket, token, source_name,db_org )
-        #todo :  if the create_dme_job fails delete the feature group in the db.
-        if enable_Dme == True :
-            response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket,  token, features_list, featureGroup_name,  DmeHost, DmePort)
+        add_featuregroup(featureGroup_name, features, datalake_source, enable_Dme, PS_DB_OBJ,dme_host, dme_port, bucket, token, source_name,db_org )
+        if enable_Dme == True :   
+            response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket,  token, features_list, featureGroup_name,  dme_host, dme_port)
             if response.status_code != 201:
                 api_response={"Exception": "Cannot create dme job"}
+                delete_feature_group_by_name(PS_DB_OBJ, featureGroup_name)
                 response_code=status.HTTP_400_BAD_REQUEST
             else:
                 api_response={"result": "Feature Group Created"}
@@ -1160,7 +1204,8 @@ def create_feature_group():
             api_response={"result": "Feature Group Created"}
             response_code =status.HTTP_200_OK    
     except Exception as err:
-        err_msg = "Failed to create the feature Group"
+        delete_feature_group_by_name(PS_DB_OBJ, featureGroup_name)
+        err_msg = "Failed to create the feature Group "
         api_response = {"Exception":err_msg}
         LOGGER.error(str(err))
     
@@ -1168,6 +1213,130 @@ def create_feature_group():
                                         status=response_code,
                                         mimetype=MIMETYPE_JSON)
 
+@APP.route('/featureGroup', methods=['GET'])
+@cross_origin()
+def get_feature_group():
+    """
+    Rest endpoint to fetch all the feature groups
+
+    Args in function: none
+    Required Args in json:
+        no json required 
+    
+    Returns:
+        json:
+            FeatureGroups: list
+                list of dictionaries.
+                    dictionaries contains:
+                        featuregroup_name: str
+                            name of feature group
+                        features: str
+                            name of features
+                        datalake: str
+                            datalake
+                        dme: boolean
+                            whether to enable dme
+                        
+    """
+    LOGGER.debug("Request for getting all feature groups")
+    api_response={}
+    response_code=status.HTTP_500_INTERNAL_SERVER_ERROR
+    try:
+        result= get_feature_groups_db(PS_DB_OBJ)
+        feature_groups=[]
+        for res in result:
+            dict_data={
+                "featuregroup_name": res[0],
+                "features": res[1],
+                "datalake": res[2],
+                "dme": res[3]                
+                }
+            feature_groups.append(dict_data)
+        api_response={"featuregroups":feature_groups}
+        response_code=status.HTTP_200_OK
+
+    except Exception as err:
+        api_response =   {"Exception": str(err)}
+        LOGGER.error(str(err))
+    return APP.response_class(response=json.dumps(api_response),
+                        status=response_code,
+                        mimetype=MIMETYPE_JSON)
+
+@APP.route('/featureGroup/<featuregroup_name>', methods=['GET'])
+@cross_origin()
+def get_feature_group_by_name(featuregroup_name):
+    """
+    Rest endpoint to fetch a feature group
+
+    Args in function:
+        featuregroup_name: str
+            name of featuregroup_name.
+
+    Returns:
+        json:
+            trainingjob: dict
+                     dictionary contains
+                         featuregroup_name: str
+                             name of featuregroup
+                         features: str
+                             features
+                         datalake: str
+                             name of datalake
+                         dme: str
+                             whether dme enabled or not
+                         dme_host: str
+                             dme host
+                         dme_port: str
+                             dme_port
+                         bucket: str
+                             bucket name
+                         token: str
+                             token for the bucket
+                         source_name: dict
+                             source name
+                         db_org: str
+                             db org
+        status code:
+            HTTP status code 200
+
+    Exceptions:
+        all exception are provided with exception message and HTTP status code.
+
+    """
+    LOGGER.debug("Request for getting a feature group with name = "+ featuregroup_name)
+    api_response={}
+    response_code=status.HTTP_500_INTERNAL_SERVER_ERROR
+    try:
+        result= get_feature_group_by_name_db(PS_DB_OBJ, featuregroup_name)
+        feature_group=[]
+        if result:
+            for res in result:
+                features=res[1].split(", ")
+                dict_data={
+                    "featuregroup_name": res[0],
+                    "features": features,
+                    "datalake": res[2],
+                    "dme": res[3],
+                    "dme_host": res[4],
+                    "dme_port": res[5],
+                    "bucket":res[6],
+                    "token":res[7],
+                    "source_name":res[8],
+                    "db_org":res[9]
+                }
+                feature_group.append(dict_data)
+            api_response={"featuregroup":feature_group}
+            response_code=status.HTTP_200_OK
+        else:
+            response_code=status.HTTP_404_NOT_FOUND
+            raise TMException("Failed to fetch feature group info from db")
+    except Exception as err:
+        api_response =   {"Exception": str(err)}
+        LOGGER.error(str(err))
+    return APP.response_class(response=json.dumps(api_response),
+                        status=response_code,
+                        mimetype=MIMETYPE_JSON) 
+
 
 def async_feature_engineering_status():
     """