From 0ba4f66932dba23b95d4af8ab2fc0dcb67a19509 Mon Sep 17 00:00:00 2001 From: rajdeep11 Date: Tue, 10 Oct 2023 16:47:35 +0530 Subject: [PATCH] Support for dynamic change of data source Issue-Id: AIMLFW-58 Change-Id: I20820cf08d1d7bfc1d6d0fc1a077377faa7bf7a4 Signed-off-by: rajdeep11 --- requirements.txt | 1 + tests/test_tm_apis.py | 7 +++++-- tests/test_trainingmgr_operations.py | 5 ++--- trainingmgr/common/trainingmgr_operations.py | 9 +++++---- trainingmgr/trainingmgr_main.py | 17 ++++++++++++----- 5 files changed, 25 insertions(+), 14 deletions(-) diff --git a/requirements.txt b/requirements.txt index 63c209a..84f48ba 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,4 @@ pandas PyYAML kubernetes validators==0.20.0 +Werkzeug==2.2.2 diff --git a/tests/test_tm_apis.py b/tests/test_tm_apis.py index 5292a39..39b1564 100644 --- a/tests/test_tm_apis.py +++ b/tests/test_tm_apis.py @@ -425,12 +425,14 @@ class Test_training_main: de_response.status_code = status.HTTP_200_OK de_response.headers={"content-type": "application/json"} de_response._content = b'{"task_status": "Completed", "result": "Data Pipeline Execution Completed"}' + db_result2=[('testing_hash', '', 'InfluxSource', '127.0.0.21', '8080', '', '', '', False, '', '', '')] @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True) @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result) + @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value = db_result2) @patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response) @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version') - def test_training(self,mock1,mock2,mock3,mock4): + def test_training(self,mock1,mock2,mock3,mock4, mock5): trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******") expected_data = 'Data Pipeline Execution Completed"' response = self.client.post("/trainingjobs/{}/training".format("usecase1"), @@ -453,9 +455,10 @@ class Test_training_main: @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True) @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result1) + @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value = db_result2) @patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response1) @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version') - def test_training_negative_de_failed(self,mock1,mock2,mock3,mock4): + def test_training_negative_de_failed(self,mock1,mock2,mock3,mock4, mock5): trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******") expected_data = 'Data Pipeline Execution Failed' response = self.client.post("/trainingjobs/{}/training".format("usecase1"), diff --git a/tests/test_trainingmgr_operations.py b/tests/test_trainingmgr_operations.py index eaf93f3..1b885dc 100644 --- a/tests/test_trainingmgr_operations.py +++ b/tests/test_trainingmgr_operations.py @@ -63,11 +63,10 @@ class Test_data_extraction_start: query_filter = "" datalake_source = {"InfluxSource": {}} _measurement = "liveCell" - bucket = "UEData" - + influxdb_info_dict={'host': '', 'port': '', 'token': '', 'source_name': '', 'db_org': '', 'bucket': ''} try: response = trainingmgr_operations.data_extraction_start(training_config_obj, trainingjob_name, feature_list, - query_filter, datalake_source, _measurement, bucket) + query_filter, datalake_source, _measurement, influxdb_info_dict) assert response.status_code == status.HTTP_200_OK assert response.headers['content-type'] == MIMETYPE_JSON except: diff --git a/trainingmgr/common/trainingmgr_operations.py b/trainingmgr/common/trainingmgr_operations.py index 3f35d21..e7b2bd0 100644 --- a/trainingmgr/common/trainingmgr_operations.py +++ b/trainingmgr/common/trainingmgr_operations.py @@ -37,8 +37,8 @@ def create_url_host_port(protocol, host, port, path=''): raise TMException('URL validation error: '+ url) return url -def data_extraction_start(training_config_obj, trainingjob_name, feature_list, query_filter, - datalake_source, _measurement, bucket): +def data_extraction_start(training_config_obj, trainingjob_name, feature_list_str, query_filter, + datalake_source, _measurement, influxdb_info_dic): """ This function calls data extraction module for data extraction of trainingjob_name training and returns response which we is gotten by calling data extraction module. @@ -54,7 +54,7 @@ def data_extraction_start(training_config_obj, trainingjob_name, feature_list, q source['source'] = datalake_source if 'InfluxSource' in datalake_source: source['source']['InfluxSource']['query']='''from(bucket:"'''+\ - bucket + '''") |> '''+\ + influxdb_info_dic["bucket"] + '''") |> '''+\ '''range(start: 0, stop: now()) '''+\ '''|> filter(fn: (r) => r._measurement == "'''+\ _measurement + '''") '''+\ @@ -66,7 +66,7 @@ def data_extraction_start(training_config_obj, trainingjob_name, feature_list, q transform['transform'] = [] transform_inner_dic = {} transform_inner_dic['operation'] = "SQLTransform" - transform_inner_dic['FeatureList'] = feature_list + transform_inner_dic['FeatureList'] = feature_list_str transform_inner_dic['SQLFilter'] = query_filter transform['transform'].append(transform_inner_dic) @@ -79,6 +79,7 @@ def data_extraction_start(training_config_obj, trainingjob_name, feature_list, q dictionary.update(source) dictionary.update(transform) dictionary['sink'] = sink + dictionary['influxdb_info']= influxdb_info_dic logger.debug(json.dumps(dictionary)) diff --git a/trainingmgr/trainingmgr_main.py b/trainingmgr/trainingmgr_main.py index fa6b451..47d9405 100644 --- a/trainingmgr/trainingmgr_main.py +++ b/trainingmgr/trainingmgr_main.py @@ -329,16 +329,23 @@ def training(trainingjob_name): else: db_results = get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ) - feature_list = db_results[0][2] + featuregroup_name = db_results[0][2] + result= get_feature_group_by_name_db(PS_DB_OBJ, featuregroup_name) + feature_list_string = result[0][1] + influxdb_info_dic={} + influxdb_info_dic["host"]=result[0][3] + influxdb_info_dic["port"]=result[0][4] + influxdb_info_dic["bucket"]=result[0][5] + influxdb_info_dic["token"]=result[0][6] + influxdb_info_dic["db_org"] = result[0][7] + influxdb_info_dic["source_name"]= result[0][11] query_filter = db_results[0][6] datalake_source = json.loads(db_results[0][14])['datalake_source'] _measurement = db_results[0][17] - bucket = db_results[0][18] - LOGGER.debug('Starting Data Extraction...') de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, trainingjob_name, - feature_list, query_filter, datalake_source, - _measurement, bucket) + feature_list_string, query_filter, datalake_source, + _measurement, influxdb_info_dic) if (de_response.status_code == status.HTTP_200_OK ): LOGGER.debug("Response from data extraction for " + \ trainingjob_name + " : " + json.dumps(de_response.json())) -- 2.16.6