From: rajdeep11 Date: Tue, 25 Apr 2023 11:53:26 +0000 (+0530) Subject: adding apis for list feature group X-Git-Tag: 1.1.0~19 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=22c78aed7aacf284248be524d8dbf59019dee306;p=aiml-fw%2Fawmf%2Ftm.git adding apis for list feature group Issue-Id: AIMLFW-40 Change-Id: I9c6cbc1e5449b31aef44902bd74edacad8903e33 Signed-off-by: rajdeep11 --- diff --git a/tests/test_tm_apis.py b/tests/test_tm_apis.py index 39127af..aeca8bc 100644 --- a/tests/test_tm_apis.py +++ b/tests/test_tm_apis.py @@ -117,11 +117,11 @@ class Test_trainingjobs_operations: @patch('trainingmgr.trainingmgr_main.get_one_word_status', return_value = "status OK") def test_trainingjobs_operations_get_exception(self,mock1,mock2): trainingmgr_main.LOGGER.debug("******* test_trainingjobs_operations get exception*******") - expected_data = "Failed to fetch training job info from db" + expected_data = b'{"trainingjobs": []}' response = self.client.get("/trainingjobs/latest",content_type="application/json") trainingmgr_main.LOGGER.debug(response.data) - assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal" - assert expected_data in str(response.data) + assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal" + assert expected_data in response.data class Test_pipeline_notification: def setup_method(self): diff --git a/trainingmgr/common/trainingmgr_util.py b/trainingmgr/common/trainingmgr_util.py index f47ea3c..4a40564 100644 --- a/trainingmgr/common/trainingmgr_util.py +++ b/trainingmgr/common/trainingmgr_util.py @@ -167,11 +167,11 @@ def check_featureGroup_data(json_data): "datalake_source", "enable_Dme", "DmeHost", "DmePort", "bucket", "token", "source_name"], json_data): featureGroup_name=json_data["featureGroupName"] - feature_list=json_data["feature_list"] + features=json_data["feature_list"] datalake_source=json_data["datalake_source"] enable_Dme=json_data["enable_Dme"] - DmeHost=json_data["DmeHost"] - DmePort=json_data["DmePort"] + dme_host=json_data["DmeHost"] + dme_port=json_data["DmePort"] bucket=json_data["bucket"] token=json_data["token"] source_name=json_data["source_name"] @@ -184,7 +184,7 @@ def check_featureGroup_data(json_data): except Exception as err: raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None - return (featureGroup_name, feature_list, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name,db_org) + return (featureGroup_name, features, datalake_source, enable_Dme, dme_host, dme_port, bucket, token, source_name,db_org) def get_one_key(dictionary): ''' diff --git a/trainingmgr/db/common_db_fun.py b/trainingmgr/db/common_db_fun.py index 46de3e4..1af5184 100644 --- a/trainingmgr/db/common_db_fun.py +++ b/trainingmgr/db/common_db_fun.py @@ -588,7 +588,7 @@ def get_feature_groups_db(ps_db_obj): This function returns feature_groups """ conn=None - result=None + result=() try: conn=ps_db_obj.get_new_conn() cursor=conn.cursor() @@ -630,6 +630,26 @@ def get_feature_group_by_name_db(ps_db_obj, featuregroup_name): if conn is not None: conn.close() return result + +def delete_feature_group_by_name(ps_db_obj, featuregroup_name): + """ + This function is used to delete the feature group from db + """ + conn=None + try: + conn=ps_db_obj.get_new_conn() + cursor=conn.cursor() + cursor.execute('''delete from {} where featuregroup_name =%s'''.format(fg_table_name),(featuregroup_name, )) + conn.commit() + cursor.close() + except Exception as err: + if conn is not None: + conn.rollback() + raise DBException(DB_QUERY_EXEC_ERROR + \ + "delete_feature_group" + str(err)) + finally: + if conn is not None: + conn.close() def get_all_jobs_latest_status_version(ps_db_obj): """ @@ -637,7 +657,7 @@ def get_all_jobs_latest_status_version(ps_db_obj): it returns False. """ conn = None - results = None + results = () try: conn = ps_db_obj.get_new_conn() cursor = conn.cursor() diff --git a/trainingmgr/trainingmgr_main.py b/trainingmgr/trainingmgr_main.py index 213d3dd..a04f1b6 100644 --- a/trainingmgr/trainingmgr_main.py +++ b/trainingmgr/trainingmgr_main.py @@ -50,7 +50,8 @@ from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainin get_info_by_version, \ get_trainingjob_info_by_name, get_latest_version_trainingjob_name, get_all_versions_info_by_name, \ update_model_download_url, add_update_trainingjob, add_featuregroup, \ - get_field_of_given_version,get_all_jobs_latest_status_version, get_info_of_latest_version + get_field_of_given_version,get_all_jobs_latest_status_version, get_info_of_latest_version, \ + get_feature_groups_db, get_feature_group_by_name_db, delete_feature_group_by_name APP = Flask(__name__) TRAININGMGR_CONFIG_OBJ = None @@ -566,18 +567,15 @@ def trainingjobs_operations(): try: results = get_all_jobs_latest_status_version(PS_DB_OBJ) trainingjobs = [] - if results: - for res in results: - dict_data = { - "trainingjob_name": res[0], - "version": res[1], - "overall_status": get_one_word_status(json.loads(res[2])) - } - trainingjobs.append(dict_data) - api_response = {"trainingjobs": trainingjobs} - response_code = status.HTTP_200_OK - else : - raise TMException("Failed to fetch training job info from db") + for res in results: + dict_data = { + "trainingjob_name": res[0], + "version": res[1], + "overall_status": get_one_word_status(json.loads(res[2])) + } + trainingjobs.append(dict_data) + api_response = {"trainingjobs": trainingjobs} + response_code = status.HTTP_200_OK except Exception as err: api_response = {"Exception": str(err)} LOGGER.error(str(err)) @@ -1134,24 +1132,70 @@ def get_metadata(trainingjob_name): status=response_code, mimetype=MIMETYPE_JSON) -@APP.route('/trainingjobs/featureGroup', methods=['POST']) +@APP.route('/featureGroup', methods=['POST']) @cross_origin() def create_feature_group(): + """ + Rest endpoint to create feature group + + Args in function: + NONE + + Args in json: + json with below fields are given: + featureGroupName: str + description + feature_list: str + feature names + enable_Dme: boolean + whether to enable dme + source_name: str + name of source + dbOrg: str + name of db org + bucket: str + bucket name + DmePort: str + DME port + DmeHost: str + DME Host + datalake_source: str + string indicating datalake source + token: str + token for the bucket + + Returns: + 1. For post request + json: + result : str + result message + status code: + HTTP status code 201 + 2. For put request + json: + result : str + result message + status code: + HTTP status code 200 + + Exceptions: + All exception are provided with exception message and HTTP status code.""" + api_response = {} response_code = status.HTTP_500_INTERNAL_SERVER_ERROR LOGGER.debug('feature Group Create request, ' + json.dumps(request.json)) try: json_data=request.json - (featureGroup_name, features, datalake_source, enable_Dme, DmeHost, DmePort, bucket, token, source_name, db_org)=check_featureGroup_data(json_data) + (featureGroup_name, features, datalake_source, enable_Dme, dme_host, dme_port, bucket, token, source_name,db_org)=check_featureGroup_data(json_data) # the features are stored in string format in the db, and has to be passed as list of feature to the dme. Hence the conversion. features_list = features.split(", ") - add_featuregroup(featureGroup_name, features, datalake_source, enable_Dme, PS_DB_OBJ,DmeHost, DmePort, bucket, token, source_name,db_org ) - #todo : if the create_dme_job fails delete the feature group in the db. - if enable_Dme == True : - response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket, token, features_list, featureGroup_name, DmeHost, DmePort) + add_featuregroup(featureGroup_name, features, datalake_source, enable_Dme, PS_DB_OBJ,dme_host, dme_port, bucket, token, source_name,db_org ) + if enable_Dme == True : + response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket, token, features_list, featureGroup_name, dme_host, dme_port) if response.status_code != 201: api_response={"Exception": "Cannot create dme job"} + delete_feature_group_by_name(PS_DB_OBJ, featureGroup_name) response_code=status.HTTP_400_BAD_REQUEST else: api_response={"result": "Feature Group Created"} @@ -1160,7 +1204,8 @@ def create_feature_group(): api_response={"result": "Feature Group Created"} response_code =status.HTTP_200_OK except Exception as err: - err_msg = "Failed to create the feature Group" + delete_feature_group_by_name(PS_DB_OBJ, featureGroup_name) + err_msg = "Failed to create the feature Group " api_response = {"Exception":err_msg} LOGGER.error(str(err)) @@ -1168,6 +1213,130 @@ def create_feature_group(): status=response_code, mimetype=MIMETYPE_JSON) +@APP.route('/featureGroup', methods=['GET']) +@cross_origin() +def get_feature_group(): + """ + Rest endpoint to fetch all the feature groups + + Args in function: none + Required Args in json: + no json required + + Returns: + json: + FeatureGroups: list + list of dictionaries. + dictionaries contains: + featuregroup_name: str + name of feature group + features: str + name of features + datalake: str + datalake + dme: boolean + whether to enable dme + + """ + LOGGER.debug("Request for getting all feature groups") + api_response={} + response_code=status.HTTP_500_INTERNAL_SERVER_ERROR + try: + result= get_feature_groups_db(PS_DB_OBJ) + feature_groups=[] + for res in result: + dict_data={ + "featuregroup_name": res[0], + "features": res[1], + "datalake": res[2], + "dme": res[3] + } + feature_groups.append(dict_data) + api_response={"featuregroups":feature_groups} + response_code=status.HTTP_200_OK + + except Exception as err: + api_response = {"Exception": str(err)} + LOGGER.error(str(err)) + return APP.response_class(response=json.dumps(api_response), + status=response_code, + mimetype=MIMETYPE_JSON) + +@APP.route('/featureGroup/', methods=['GET']) +@cross_origin() +def get_feature_group_by_name(featuregroup_name): + """ + Rest endpoint to fetch a feature group + + Args in function: + featuregroup_name: str + name of featuregroup_name. + + Returns: + json: + trainingjob: dict + dictionary contains + featuregroup_name: str + name of featuregroup + features: str + features + datalake: str + name of datalake + dme: str + whether dme enabled or not + dme_host: str + dme host + dme_port: str + dme_port + bucket: str + bucket name + token: str + token for the bucket + source_name: dict + source name + db_org: str + db org + status code: + HTTP status code 200 + + Exceptions: + all exception are provided with exception message and HTTP status code. + + """ + LOGGER.debug("Request for getting a feature group with name = "+ featuregroup_name) + api_response={} + response_code=status.HTTP_500_INTERNAL_SERVER_ERROR + try: + result= get_feature_group_by_name_db(PS_DB_OBJ, featuregroup_name) + feature_group=[] + if result: + for res in result: + features=res[1].split(", ") + dict_data={ + "featuregroup_name": res[0], + "features": features, + "datalake": res[2], + "dme": res[3], + "dme_host": res[4], + "dme_port": res[5], + "bucket":res[6], + "token":res[7], + "source_name":res[8], + "db_org":res[9] + } + feature_group.append(dict_data) + api_response={"featuregroup":feature_group} + response_code=status.HTTP_200_OK + else: + response_code=status.HTTP_404_NOT_FOUND + raise TMException("Failed to fetch feature group info from db") + except Exception as err: + api_response = {"Exception": str(err)} + LOGGER.error(str(err)) + return APP.response_class(response=json.dumps(api_response), + status=response_code, + mimetype=MIMETYPE_JSON) + def async_feature_engineering_status(): """