raise TMException('URL validation error: '+ url)
return url
-def data_extraction_start(training_config_obj, featuregroup_name, feature_list_str, query_filter,
- datalake_source, _measurement, influxdb_info_dic, training_job_id):
+def data_extraction_start(training_config_obj, training_job_id, feature_list_str, query_filter,
+ datalake_source, _measurement, influxdb_info_dic, featuregroup_name):
"""
This function calls data extraction module for data extraction of trainingjob_name training and
returns response which we is gotten by calling data extraction module.
"""
logger = training_config_obj.logger
- logger.debug('training manager is calling data extraction for '+ featuregroup_name)
+ logger.debug('training manager is calling data extraction for trainingjob_id '+ str(training_job_id))
data_extraction_ip = training_config_obj.data_extraction_ip
data_extraction_port = training_config_obj.data_extraction_port
url = 'http://'+str(data_extraction_ip)+':'+str(data_extraction_port)+'/feature-groups' #NOSONAR
sink = {}
sink_inner_dic = {}
- sink_inner_dic['CollectionName'] = featuregroup_name
+ # sink_inner_dic['CollectionName'] = str(training_job_id)
+ sink_inner_dic['CollectionName'] = featuregroup_name + "_" + str(training_job_id)
sink['CassandraSink'] = sink_inner_dic
dictionary = {}
dictionary.update(transform)
dictionary['sink'] = sink
dictionary['influxdb_info']= influxdb_info_dic
- dictionary["trainingjob_id"] = training_job_id
+ dictionary["trainingjob_id"] = str(training_job_id)
logger.debug(json.dumps(dictionary))