Support for dynamic change of data source(data-extraction) 72/11872/5
authorrajdeep11 <rajdeep.sin@samsung.com>
Tue, 10 Oct 2023 11:35:09 +0000 (17:05 +0530)
committerrajdeep11 <rajdeep.sin@samsung.com>
Mon, 23 Oct 2023 17:01:51 +0000 (22:31 +0530)
Issue-Id: AIMLFW-58

Change-Id: Ic957198c11795db24e8b221590d497a024f516e2
Signed-off-by: rajdeep11 <rajdeep.sin@samsung.com>
dataextraction/ConfigHelper.py
dataextraction/FeatureEngineeringFactory.py
dataextraction/main.py
dataextraction/source/InfluxSource.py
test/test_pipeline.py
tox.ini

index 2f1e2a5..47f8dc4 100644 (file)
@@ -246,3 +246,4 @@ class ConfigHelper(metaclass=Singleton):
         Function: Get FS Port
         """
         return self.fs_dict['fs_port']
+    
index 06dcbcb..96f440f 100644 (file)
@@ -24,6 +24,7 @@ import importlib
 from lru import LRU
 from ConfigHelper import ConfigHelper
 from Pipeline import Pipeline
+from source.InfluxSource import InfluxSource
 
 class FeatureEngineeringFactory():
     """
@@ -39,6 +40,7 @@ class FeatureEngineeringFactory():
         self.modulename = ""
         self.default_class = ""
         self.hash_table = LRU(10)
+        self.influxdb_dict={}
         self.logger.debug("************Starting Init FeatureEngineeringFactory***********")
         try:
             self.spark_session = spark_manager
@@ -68,7 +70,12 @@ class FeatureEngineeringFactory():
             my_class = getattr(importlib.import_module(self.modulename+"."+self.default_class), self.default_class)
             my_classinstance = my_class(my_classorflavour)
         self.logger.debug("Initialization of Class")
-        my_classinstance.init(self.spark_session, self.config_help, inputs_to_class)
+        # if the tm sends the influx_db info dict
+        if any(self.influxdb_dict.values()) and baseclass_name=="Source":
+            my_classinstance.init_dynamic(self.spark_session, self.config_help, inputs_to_class, self.influxdb_dict)
+        else :
+        #when the tm does not sends the influx_db dict  
+            my_classinstance.init(self.spark_session, self.config_help, inputs_to_class)
         return my_classinstance
         
         
@@ -95,12 +102,13 @@ class FeatureEngineeringFactory():
         return my_instancelist
         
         
-    def get_batch_pipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key):
+    def get_batch_pipeline(self, source_classdict, transform_classdict, sink_classdict, influxdb_dict, caching_key):
         """
         @Function: Makes to get Batch Pipeline
         @ Input  : source Classess, Sink Classess, Transform Classess
         @Output  : Instance of Pipeline Object
         """
+        self.influxdb_dict= influxdb_dict
         if self.hash_table.get(caching_key) is None:
             self.logger.debug("Cached Instance Not Found, Creating Instance, Key" + caching_key)
             source_instancelist = None
index 915d1d6..6f9ed2b 100644 (file)
@@ -168,9 +168,10 @@ def async_code_worker():
             source_dict = request_json["source"]
             transform_dict = request_json["transform"]
             sink_dict = request_json["sink"]
+            influxdb_dict= request_json["influxdb_info"]
             c_key = str(source_dict)+str(transform_dict)+str(sink_dict)
             logger.debug(c_key)
-            feature_engineering_pipeline = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, c_key)
+            feature_engineering_pipeline = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, influxdb_dict, c_key)
             session = session_helper.get_session()
             feature_engineering_pipeline.load_data(session)
             feature_engineering_pipeline.transform_data(session)
index de04a2f..8779c4f 100644 (file)
@@ -64,6 +64,31 @@ class InfluxSource(Source):
             self.logger.debug("Set variable query"+ self.query)
         else:
             self.logger.error("Expected variable query"+ self.query)
+
+    def init_dynamic(self, sparkhelper, confighelper, inputdict, influxdb_dict):
+        """
+        @Method: init
+        @Input : spark - Spark
+                 confighelper - config helper
+                 inputdict - input dictionary
+                 influx data source info - input dictionary
+        @Output: None
+        """
+        self.logger  = confighelper.getLogger()
+        self.logger.debug("the init_dynamic is called")
+        classconfig  = confighelper.getClassConfig(self)
+        self.url     = f"http://{influxdb_dict['host']}:{influxdb_dict['port']}"
+        self.token   = influxdb_dict["token"]
+        self.org     = influxdb_dict["db_org"]
+        self.timeout = 100000
+        self.ssl     = False
+        self.query   = classconfig["Query"]
+        if "query" in inputdict.keys():
+            self.query = inputdict["query"]
+            self.logger.debug("Set variable query"+ self.query)
+        else:
+            self.logger.error("Expected variable query"+ self.query)
+
     def load(self,sparksession):
         """
         @Method: init
index e48e5b7..a1f3c7b 100644 (file)
@@ -45,13 +45,13 @@ class helper:
 
 class Test_Pipeline:
     def setup_method(self):
-        api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}}
+        api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}, 'influxdb_info': {'host': '', 'port': '', 'token': '', 'source_name': '', 'db_org': '', 'bucket': ''}}
         load_dotenv('test/test_env.env')
         os.environ['CODE_DIR_PATH'] = 'test'
         session_helper = SparkSessionManager()
         factory = FeatureEngineeringFactory(session_helper)
-        (source_dict, transform_dict, sink_dict) = (api_json['source'], api_json['transform'], api_json['sink'])
-        self.obj = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict))
+        (source_dict, transform_dict, sink_dict, influxdb_dict) = (api_json['source'], api_json['transform'], api_json['sink'], api_json['influxdb_info'])
+        self.obj = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, influxdb_dict,str(source_dict) + str(transform_dict) + str(sink_dict))
         self.spark_session = session_helper
 
     
diff --git a/tox.ini b/tox.ini
index dc7ca34..b072e56 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -39,6 +39,7 @@ deps=
   Flask-RESTful
   lru-dict
   PyYAML
+  Werkzeug==2.2.2
   
 
 commands =