Support for dynamic change of data source 71/11871/2
authorrajdeep11 <rajdeep.sin@samsung.com>
Tue, 10 Oct 2023 11:17:35 +0000 (16:47 +0530)
committerrajdeep11 <rajdeep.sin@samsung.com>
Wed, 11 Oct 2023 11:01:45 +0000 (16:31 +0530)
Issue-Id: AIMLFW-58

Change-Id: I20820cf08d1d7bfc1d6d0fc1a077377faa7bf7a4
Signed-off-by: rajdeep11 <rajdeep.sin@samsung.com>
requirements.txt
tests/test_tm_apis.py
tests/test_trainingmgr_operations.py
trainingmgr/common/trainingmgr_operations.py
trainingmgr/trainingmgr_main.py

index 63c209a..84f48ba 100644 (file)
@@ -26,3 +26,4 @@ pandas
 PyYAML
 kubernetes
 validators==0.20.0
+Werkzeug==2.2.2
index 5292a39..39b1564 100644 (file)
@@ -425,12 +425,14 @@ class Test_training_main:
     de_response.status_code = status.HTTP_200_OK
     de_response.headers={"content-type": "application/json"}
     de_response._content = b'{"task_status": "Completed", "result": "Data Pipeline Execution Completed"}'
+    db_result2=[('testing_hash', '', 'InfluxSource', '127.0.0.21', '8080', '', '', '', False, '', '', '')]
 
     @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
     @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result)
+    @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value = db_result2)
     @patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response)
     @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
-    def test_training(self,mock1,mock2,mock3,mock4):
+    def test_training(self,mock1,mock2,mock3,mock4, mock5):
         trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
         expected_data = 'Data Pipeline Execution Completed"'
         response = self.client.post("/trainingjobs/{}/training".format("usecase1"),
@@ -453,9 +455,10 @@ class Test_training_main:
 
     @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
     @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result1)
+    @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value = db_result2)
     @patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response1)
     @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
-    def test_training_negative_de_failed(self,mock1,mock2,mock3,mock4):
+    def test_training_negative_de_failed(self,mock1,mock2,mock3,mock4, mock5):
         trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
         expected_data = 'Data Pipeline Execution Failed'
         response = self.client.post("/trainingjobs/{}/training".format("usecase1"),
index eaf93f3..1b885dc 100644 (file)
@@ -63,11 +63,10 @@ class Test_data_extraction_start:
         query_filter = ""
         datalake_source = {"InfluxSource": {}}
         _measurement = "liveCell"
-        bucket = "UEData"
-
+        influxdb_info_dict={'host': '', 'port': '', 'token': '', 'source_name': '', 'db_org': '', 'bucket': ''}
         try:
             response = trainingmgr_operations.data_extraction_start(training_config_obj, trainingjob_name, feature_list,
-                                                                    query_filter, datalake_source, _measurement, bucket)
+                                                                    query_filter, datalake_source, _measurement, influxdb_info_dict)
             assert response.status_code == status.HTTP_200_OK
             assert response.headers['content-type'] == MIMETYPE_JSON
         except:
index 3f35d21..e7b2bd0 100644 (file)
@@ -37,8 +37,8 @@ def create_url_host_port(protocol, host, port, path=''):
         raise TMException('URL validation error: '+ url)
     return url
 
-def data_extraction_start(training_config_obj, trainingjob_name, feature_list, query_filter,
-                          datalake_source, _measurement, bucket):
+def data_extraction_start(training_config_obj, trainingjob_name, feature_list_str, query_filter,
+                          datalake_source, _measurement, influxdb_info_dic):
     """
     This function calls data extraction module for data extraction of trainingjob_name training and
     returns response which we is gotten by calling data extraction module.
@@ -54,7 +54,7 @@ def data_extraction_start(training_config_obj, trainingjob_name, feature_list, q
     source['source'] = datalake_source
     if 'InfluxSource' in datalake_source:
         source['source']['InfluxSource']['query']='''from(bucket:"'''+\
-                                                  bucket + '''") |> '''+\
+                                                  influxdb_info_dic["bucket"] + '''") |> '''+\
                                                   '''range(start: 0, stop: now()) '''+\
                                                   '''|> filter(fn: (r) => r._measurement == "'''+\
                                                   _measurement + '''") '''+\
@@ -66,7 +66,7 @@ def data_extraction_start(training_config_obj, trainingjob_name, feature_list, q
     transform['transform'] = []
     transform_inner_dic = {}
     transform_inner_dic['operation'] = "SQLTransform"
-    transform_inner_dic['FeatureList'] = feature_list
+    transform_inner_dic['FeatureList'] = feature_list_str
     transform_inner_dic['SQLFilter'] = query_filter
     transform['transform'].append(transform_inner_dic)
 
@@ -79,6 +79,7 @@ def data_extraction_start(training_config_obj, trainingjob_name, feature_list, q
     dictionary.update(source)
     dictionary.update(transform)
     dictionary['sink'] = sink
+    dictionary['influxdb_info']= influxdb_info_dic
    
     logger.debug(json.dumps(dictionary))
 
index fa6b451..47d9405 100644 (file)
@@ -329,16 +329,23 @@ def training(trainingjob_name):
         else:
 
             db_results = get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ)
-            feature_list = db_results[0][2]
+            featuregroup_name = db_results[0][2]
+            result= get_feature_group_by_name_db(PS_DB_OBJ, featuregroup_name)
+            feature_list_string = result[0][1]
+            influxdb_info_dic={}
+            influxdb_info_dic["host"]=result[0][3]
+            influxdb_info_dic["port"]=result[0][4]
+            influxdb_info_dic["bucket"]=result[0][5]
+            influxdb_info_dic["token"]=result[0][6]
+            influxdb_info_dic["db_org"] = result[0][7]
+            influxdb_info_dic["source_name"]= result[0][11]
             query_filter = db_results[0][6]
             datalake_source = json.loads(db_results[0][14])['datalake_source']
             _measurement = db_results[0][17]
-            bucket = db_results[0][18]
-
             LOGGER.debug('Starting Data Extraction...')
             de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, trainingjob_name,
-                                        feature_list, query_filter, datalake_source,
-                                        _measurement, bucket)
+                                         feature_list_string, query_filter, datalake_source,
+                                         _measurement, influxdb_info_dic)
             if (de_response.status_code == status.HTTP_200_OK ):
                 LOGGER.debug("Response from data extraction for " + \
                         trainingjob_name + " : " + json.dumps(de_response.json()))