From: rajdeep11 Date: Thu, 12 Dec 2024 10:41:42 +0000 (+0530) Subject: fixing the dataextraction start X-Git-Tag: 3.0.0~18 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F52%2F13852%2F1;p=aiml-fw%2Fawmf%2Ftm.git fixing the dataextraction start Change-Id: I3e2dc92b6cf9bbdfc48a0ede0960bc3228771e90 Signed-off-by: rajdeep11 --- diff --git a/trainingmgr/common/trainingmgr_operations.py b/trainingmgr/common/trainingmgr_operations.py index ed21789..d79e2c1 100644 --- a/trainingmgr/common/trainingmgr_operations.py +++ b/trainingmgr/common/trainingmgr_operations.py @@ -41,14 +41,14 @@ def create_url_host_port(protocol, host, port, path=''): raise TMException('URL validation error: '+ url) return url -def data_extraction_start(training_config_obj, featuregroup_name, feature_list_str, query_filter, - datalake_source, _measurement, influxdb_info_dic, training_job_id): +def data_extraction_start(training_config_obj, training_job_id, feature_list_str, query_filter, + datalake_source, _measurement, influxdb_info_dic, featuregroup_name): """ This function calls data extraction module for data extraction of trainingjob_name training and returns response which we is gotten by calling data extraction module. """ logger = training_config_obj.logger - logger.debug('training manager is calling data extraction for '+ featuregroup_name) + logger.debug('training manager is calling data extraction for trainingjob_id '+ str(training_job_id)) data_extraction_ip = training_config_obj.data_extraction_ip data_extraction_port = training_config_obj.data_extraction_port url = 'http://'+str(data_extraction_ip)+':'+str(data_extraction_port)+'/feature-groups' #NOSONAR @@ -76,7 +76,8 @@ def data_extraction_start(training_config_obj, featuregroup_name, feature_list_s sink = {} sink_inner_dic = {} - sink_inner_dic['CollectionName'] = featuregroup_name + # sink_inner_dic['CollectionName'] = str(training_job_id) + sink_inner_dic['CollectionName'] = featuregroup_name + "_" + str(training_job_id) sink['CassandraSink'] = sink_inner_dic dictionary = {} @@ -84,7 +85,7 @@ def data_extraction_start(training_config_obj, featuregroup_name, feature_list_s dictionary.update(transform) dictionary['sink'] = sink dictionary['influxdb_info']= influxdb_info_dic - dictionary["trainingjob_id"] = training_job_id + dictionary["trainingjob_id"] = str(training_job_id) logger.debug(json.dumps(dictionary))