From ff725db7f4c62d5071b4a757917276d2766ceb2d Mon Sep 17 00:00:00 2001 From: rajdeep11 Date: Tue, 10 Oct 2023 17:05:09 +0530 Subject: [PATCH] Support for dynamic change of data source(data-extraction) Issue-Id: AIMLFW-58 Change-Id: Ic957198c11795db24e8b221590d497a024f516e2 Signed-off-by: rajdeep11 --- dataextraction/ConfigHelper.py | 1 + dataextraction/FeatureEngineeringFactory.py | 12 ++++++++++-- dataextraction/main.py | 3 ++- dataextraction/source/InfluxSource.py | 25 +++++++++++++++++++++++++ test/test_pipeline.py | 6 +++--- tox.ini | 1 + 6 files changed, 42 insertions(+), 6 deletions(-) diff --git a/dataextraction/ConfigHelper.py b/dataextraction/ConfigHelper.py index 2f1e2a5..47f8dc4 100644 --- a/dataextraction/ConfigHelper.py +++ b/dataextraction/ConfigHelper.py @@ -246,3 +246,4 @@ class ConfigHelper(metaclass=Singleton): Function: Get FS Port """ return self.fs_dict['fs_port'] + diff --git a/dataextraction/FeatureEngineeringFactory.py b/dataextraction/FeatureEngineeringFactory.py index 06dcbcb..96f440f 100644 --- a/dataextraction/FeatureEngineeringFactory.py +++ b/dataextraction/FeatureEngineeringFactory.py @@ -24,6 +24,7 @@ import importlib from lru import LRU from ConfigHelper import ConfigHelper from Pipeline import Pipeline +from source.InfluxSource import InfluxSource class FeatureEngineeringFactory(): """ @@ -39,6 +40,7 @@ class FeatureEngineeringFactory(): self.modulename = "" self.default_class = "" self.hash_table = LRU(10) + self.influxdb_dict={} self.logger.debug("************Starting Init FeatureEngineeringFactory***********") try: self.spark_session = spark_manager @@ -68,7 +70,12 @@ class FeatureEngineeringFactory(): my_class = getattr(importlib.import_module(self.modulename+"."+self.default_class), self.default_class) my_classinstance = my_class(my_classorflavour) self.logger.debug("Initialization of Class") - my_classinstance.init(self.spark_session, self.config_help, inputs_to_class) + # if the tm sends the influx_db info dict + if any(self.influxdb_dict.values()) and baseclass_name=="Source": + my_classinstance.init_dynamic(self.spark_session, self.config_help, inputs_to_class, self.influxdb_dict) + else : + #when the tm does not sends the influx_db dict + my_classinstance.init(self.spark_session, self.config_help, inputs_to_class) return my_classinstance @@ -95,12 +102,13 @@ class FeatureEngineeringFactory(): return my_instancelist - def get_batch_pipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key): + def get_batch_pipeline(self, source_classdict, transform_classdict, sink_classdict, influxdb_dict, caching_key): """ @Function: Makes to get Batch Pipeline @ Input : source Classess, Sink Classess, Transform Classess @Output : Instance of Pipeline Object """ + self.influxdb_dict= influxdb_dict if self.hash_table.get(caching_key) is None: self.logger.debug("Cached Instance Not Found, Creating Instance, Key" + caching_key) source_instancelist = None diff --git a/dataextraction/main.py b/dataextraction/main.py index 915d1d6..6f9ed2b 100644 --- a/dataextraction/main.py +++ b/dataextraction/main.py @@ -168,9 +168,10 @@ def async_code_worker(): source_dict = request_json["source"] transform_dict = request_json["transform"] sink_dict = request_json["sink"] + influxdb_dict= request_json["influxdb_info"] c_key = str(source_dict)+str(transform_dict)+str(sink_dict) logger.debug(c_key) - feature_engineering_pipeline = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, c_key) + feature_engineering_pipeline = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, influxdb_dict, c_key) session = session_helper.get_session() feature_engineering_pipeline.load_data(session) feature_engineering_pipeline.transform_data(session) diff --git a/dataextraction/source/InfluxSource.py b/dataextraction/source/InfluxSource.py index de04a2f..8779c4f 100644 --- a/dataextraction/source/InfluxSource.py +++ b/dataextraction/source/InfluxSource.py @@ -64,6 +64,31 @@ class InfluxSource(Source): self.logger.debug("Set variable query"+ self.query) else: self.logger.error("Expected variable query"+ self.query) + + def init_dynamic(self, sparkhelper, confighelper, inputdict, influxdb_dict): + """ + @Method: init + @Input : spark - Spark + confighelper - config helper + inputdict - input dictionary + influx data source info - input dictionary + @Output: None + """ + self.logger = confighelper.getLogger() + self.logger.debug("the init_dynamic is called") + classconfig = confighelper.getClassConfig(self) + self.url = f"http://{influxdb_dict['host']}:{influxdb_dict['port']}" + self.token = influxdb_dict["token"] + self.org = influxdb_dict["db_org"] + self.timeout = 100000 + self.ssl = False + self.query = classconfig["Query"] + if "query" in inputdict.keys(): + self.query = inputdict["query"] + self.logger.debug("Set variable query"+ self.query) + else: + self.logger.error("Expected variable query"+ self.query) + def load(self,sparksession): """ @Method: init diff --git a/test/test_pipeline.py b/test/test_pipeline.py index e48e5b7..a1f3c7b 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -45,13 +45,13 @@ class helper: class Test_Pipeline: def setup_method(self): - api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}} + api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}, 'influxdb_info': {'host': '', 'port': '', 'token': '', 'source_name': '', 'db_org': '', 'bucket': ''}} load_dotenv('test/test_env.env') os.environ['CODE_DIR_PATH'] = 'test' session_helper = SparkSessionManager() factory = FeatureEngineeringFactory(session_helper) - (source_dict, transform_dict, sink_dict) = (api_json['source'], api_json['transform'], api_json['sink']) - self.obj = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict)) + (source_dict, transform_dict, sink_dict, influxdb_dict) = (api_json['source'], api_json['transform'], api_json['sink'], api_json['influxdb_info']) + self.obj = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, influxdb_dict,str(source_dict) + str(transform_dict) + str(sink_dict)) self.spark_session = session_helper diff --git a/tox.ini b/tox.ini index dc7ca34..b072e56 100644 --- a/tox.ini +++ b/tox.ini @@ -39,6 +39,7 @@ deps= Flask-RESTful lru-dict PyYAML + Werkzeug==2.2.2 commands = -- 2.16.6