PyYAML
kubernetes
validators==0.20.0
+Werkzeug==2.2.2
de_response.status_code = status.HTTP_200_OK
de_response.headers={"content-type": "application/json"}
de_response._content = b'{"task_status": "Completed", "result": "Data Pipeline Execution Completed"}'
+ db_result2=[('testing_hash', '', 'InfluxSource', '127.0.0.21', '8080', '', '', '', False, '', '', '')]
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
@patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result)
+ @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value = db_result2)
@patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response)
@patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
- def test_training(self,mock1,mock2,mock3,mock4):
+ def test_training(self,mock1,mock2,mock3,mock4, mock5):
trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
expected_data = 'Data Pipeline Execution Completed"'
response = self.client.post("/trainingjobs/{}/training".format("usecase1"),
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
@patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result1)
+ @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value = db_result2)
@patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response1)
@patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
- def test_training_negative_de_failed(self,mock1,mock2,mock3,mock4):
+ def test_training_negative_de_failed(self,mock1,mock2,mock3,mock4, mock5):
trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
expected_data = 'Data Pipeline Execution Failed'
response = self.client.post("/trainingjobs/{}/training".format("usecase1"),
query_filter = ""
datalake_source = {"InfluxSource": {}}
_measurement = "liveCell"
- bucket = "UEData"
-
+ influxdb_info_dict={'host': '', 'port': '', 'token': '', 'source_name': '', 'db_org': '', 'bucket': ''}
try:
response = trainingmgr_operations.data_extraction_start(training_config_obj, trainingjob_name, feature_list,
- query_filter, datalake_source, _measurement, bucket)
+ query_filter, datalake_source, _measurement, influxdb_info_dict)
assert response.status_code == status.HTTP_200_OK
assert response.headers['content-type'] == MIMETYPE_JSON
except:
raise TMException('URL validation error: '+ url)
return url
-def data_extraction_start(training_config_obj, trainingjob_name, feature_list, query_filter,
- datalake_source, _measurement, bucket):
+def data_extraction_start(training_config_obj, trainingjob_name, feature_list_str, query_filter,
+ datalake_source, _measurement, influxdb_info_dic):
"""
This function calls data extraction module for data extraction of trainingjob_name training and
returns response which we is gotten by calling data extraction module.
source['source'] = datalake_source
if 'InfluxSource' in datalake_source:
source['source']['InfluxSource']['query']='''from(bucket:"'''+\
- bucket + '''") |> '''+\
+ influxdb_info_dic["bucket"] + '''") |> '''+\
'''range(start: 0, stop: now()) '''+\
'''|> filter(fn: (r) => r._measurement == "'''+\
_measurement + '''") '''+\
transform['transform'] = []
transform_inner_dic = {}
transform_inner_dic['operation'] = "SQLTransform"
- transform_inner_dic['FeatureList'] = feature_list
+ transform_inner_dic['FeatureList'] = feature_list_str
transform_inner_dic['SQLFilter'] = query_filter
transform['transform'].append(transform_inner_dic)
dictionary.update(source)
dictionary.update(transform)
dictionary['sink'] = sink
+ dictionary['influxdb_info']= influxdb_info_dic
logger.debug(json.dumps(dictionary))
else:
db_results = get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ)
- feature_list = db_results[0][2]
+ featuregroup_name = db_results[0][2]
+ result= get_feature_group_by_name_db(PS_DB_OBJ, featuregroup_name)
+ feature_list_string = result[0][1]
+ influxdb_info_dic={}
+ influxdb_info_dic["host"]=result[0][3]
+ influxdb_info_dic["port"]=result[0][4]
+ influxdb_info_dic["bucket"]=result[0][5]
+ influxdb_info_dic["token"]=result[0][6]
+ influxdb_info_dic["db_org"] = result[0][7]
+ influxdb_info_dic["source_name"]= result[0][11]
query_filter = db_results[0][6]
datalake_source = json.loads(db_results[0][14])['datalake_source']
_measurement = db_results[0][17]
- bucket = db_results[0][18]
-
LOGGER.debug('Starting Data Extraction...')
de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, trainingjob_name,
- feature_list, query_filter, datalake_source,
- _measurement, bucket)
+ feature_list_string, query_filter, datalake_source,
+ _measurement, influxdb_info_dic)
if (de_response.status_code == status.HTTP_200_OK ):
LOGGER.debug("Response from data extraction for " + \
trainingjob_name + " : " + json.dumps(de_response.json()))