Feature engineering pipeline changes 06/9306/1
authorjosephthaliath <jo.thaliath@samsung.com>
Tue, 18 Oct 2022 09:12:22 +0000 (14:42 +0530)
committerjosephthaliath <jo.thaliath@samsung.com>
Tue, 18 Oct 2022 09:13:40 +0000 (14:43 +0530)
Issue-Id: AIMLFW-2

Signed-off-by: josephthaliath <jo.thaliath@samsung.com>
Change-Id: I01111d4d65a72ef9b95b4c801d11f3f3a6122ebe

dataextraction/FeatureEngineeringFactory.py [new file with mode: 0644]
dataextraction/Pipeline.py [new file with mode: 0644]
dataextraction/SparkConfig.ini [new file with mode: 0644]
dataextraction/SparkHelper.py [new file with mode: 0644]

diff --git a/dataextraction/FeatureEngineeringFactory.py b/dataextraction/FeatureEngineeringFactory.py
new file mode 100644 (file)
index 0000000..094d5e3
--- /dev/null
@@ -0,0 +1,118 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""
+    @Module: Factory to create and return instances of Pipeline  
+"""
+import importlib
+# pylint: disable=E0611
+from lru import LRU
+from ConfigHelper import ConfigHelper
+from Pipeline import Pipeline
+
+class FeatureEngineeringFactory():
+    """
+    @Class: Factory to create and return instances of Pipeline
+    """
+    def __init__(self, spark_manager):
+        """
+        @ Constructor
+        @ Input: spark_manager
+        """
+        self.config_help = ConfigHelper()
+        self.logger = self.config_help.getLogger()
+        self.modulename = ""
+        self.default_class = ""
+        self.hash_table = LRU(10)
+        self.logger.debug("************Starting Init FeatureEngineeringFactory***********")
+        try:
+            self.spark_session = spark_manager
+        except Exception as exc:
+            raise Exception('Spark session Error').with_traceback(exc.__traceback__)
+    def create_instance(self, baseclass_name, my_classorflavour, inputs_to_class):
+        """
+        @Function: Makes  Instances
+        @ Input  : BaseClass Name & ClassName/FlavourName & Inputs to Class
+        @Output  : Class Instance
+        """
+        self.logger.info("BaseClassName "+ baseclass_name + " Class Flavor "+ my_classorflavour)
+        default = self.config_help.isDefault(baseclass_name, my_classorflavour)
+        self.modulename = self.config_help.getModuleName(baseclass_name)
+        self.default_class = self.config_help.getDefaultClassName(baseclass_name)
+        self.logger.debug("Instantiation of module " + self.modulename)
+        self.logger.debug("Default Class " + self.default_class)
+        self.logger.debug("Class OR Flavour " + my_classorflavour)
+        if not default:
+            import_class = self.modulename+"."+ my_classorflavour
+            self.logger.debug("Class to be imported " + import_class)
+            my_class = getattr(importlib.import_module(import_class), my_classorflavour)
+            my_classinstance = my_class(my_classorflavour)
+        elif default:
+            self.logger.debug("Module Name "+self.modulename)
+            self.logger.debug("default Class Name "+self.default_class)
+            my_class = getattr(importlib.import_module(self.modulename+"."+self.default_class), self.default_class)
+            my_classinstance = my_class(my_classorflavour)
+        self.logger.debug("Initialization of Class")
+        my_classinstance.init(self.spark_session, self.config_help, inputs_to_class)
+        return my_classinstance
+        
+        
+    def __makebulk(self, baseclass_name, class_dictionary):
+        """
+        @Function: Makes Bulk Instances
+        @ Input  : BaseClass Name & Dictionay of Classess
+        @Output  : List of outputs
+        """
+        my_instancelist = []
+        self.modulename = self.config_help.getModuleName(baseclass_name)
+        self.default_class = self.config_help.getDefaultClassName(baseclass_name)
+        if baseclass_name != "Transform":
+            for my_class in class_dictionary:
+                my_instancelist.append(self.create_instance(baseclass_name, my_class, class_dictionary[my_class]))
+                self.logger.debug("Created instance for Source/Sink..")
+        elif baseclass_name == "Transform":
+            for value in class_dictionary:
+                my_class = value["operation"]
+                inputdict = value
+                inputdict.pop("operation")
+                self.logger.debug("Instanciating Base Class "+baseclass_name+" My Class "+my_class)
+                my_instancelist.append(self.create_instance(baseclass_name, my_class, inputdict))
+        return my_instancelist
+        
+        
+    def getBatchPipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key):
+        """
+        @Function: Makes to get Batch Pipeline
+        @ Input  : source Classess, Sink Classess, Transform Classess
+        @Output  : Instance of Pipeline Object
+        """
+        if self.hash_table.get(caching_key) is None:
+            self.logger.debug("Cached Instance Not Found, Creating Instance, Key" + caching_key)
+            source_instancelist = None
+            transformer_instancelist = None
+            sink_instancelist = None
+            
+            source_instancelist = self.__makebulk("Source", source_classdict)
+            
+            if transform_classdict is not None:
+                transformer_instancelist = self.__makebulk("Transform", transform_classdict)
+            if sink_classdict is not None:
+                sink_instancelist = self.__makebulk("Sink", sink_classdict)
+                
+            self.hash_table[caching_key] = Pipeline(source_instancelist, transformer_instancelist, sink_instancelist)
+        return self.hash_table[caching_key]
diff --git a/dataextraction/Pipeline.py b/dataextraction/Pipeline.py
new file mode 100644 (file)
index 0000000..c7bdeaa
--- /dev/null
@@ -0,0 +1,67 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""
+@Module : Feature Engineering Pipeline
+"""
+from ConfigHelper import ConfigHelper
+
+class Pipeline():
+    """
+    @Class: Feature Engineering Pipeline
+    """
+    def __init__(self, sources, transformers, sinks):
+        """
+        @Constructor
+        """
+        self.logger = ConfigHelper().logger
+        self.sources = sources
+        self.transformers = transformers
+        self.sinks = sinks
+        self.logger.debug("Pipeline Created")
+        self.spark_dflist = None
+        self.transformed_df = None
+    def loadData(self, session):
+        """
+        @Function: Loads data from source
+        """
+        self.logger.info("Source:" + str(self.sources[0]))
+        self.spark_dflist = self.sources[0].load(session)
+        self.logger.info("Data Load Completed")
+    def transformData(self, session):
+        """
+        @Function : Transform Data
+        """
+        if self.transformers is None:
+            self.transformed_df = self.spark_dflist
+        else:
+            self.transformed_df = self.transformers[0].transform(session, self.spark_dflist)
+        self.logger.info("Data Transform Completed")
+    def writeData(self, session):
+        """
+        @Function: Write Data
+        """
+        self.sinks[0].write(session, self.transformed_df)
+        self.logger.info("Data Written to Sink")
+    def execute(self, session):
+        """
+        @Function : Execute Pipeline
+        """
+        self.loadData(session)
+        self.transformData(session)
+        self.writeData(session)
diff --git a/dataextraction/SparkConfig.ini b/dataextraction/SparkConfig.ini
new file mode 100644 (file)
index 0000000..7ee005b
--- /dev/null
@@ -0,0 +1,6 @@
+[BaseConfig]
+DefaultAppName=Pipeline
+DefaultMaster=local[1]
+Override_Log_Level=ERROR
+[ExtraConfig]
+spark.logConf=true
diff --git a/dataextraction/SparkHelper.py b/dataextraction/SparkHelper.py
new file mode 100644 (file)
index 0000000..30676c9
--- /dev/null
@@ -0,0 +1,84 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""
+@Module:Spark Session Manager
+"""
+import configparser
+from pyspark import SparkConf
+from pyspark.sql import SparkSession
+from ConfigHelper import ConfigHelper
+
+class sparkSessionManager():
+    """
+    @Module:Spark Session Manager
+    """
+    def __init__(self):
+        """
+        @Constructor:Spark Session Manager
+        """
+        config = configparser.ConfigParser()
+        confighelp = ConfigHelper()
+        self.logger = confighelp.getLogger()
+        self.loglevel = confighelp.log_level
+        code_path = confighelp.exec_path
+        if code_path is None:
+            config.read("SparkConfig.ini")
+        else:
+            config.read(code_path + "/SparkConfig.ini")
+        base = config['BaseConfig']
+        self.appname = base["DefaultAppName"]
+        self.master = base["DefaultMaster"]
+        if base["Override_Log_Level"] is not None:
+            self.loglevel = base["Override_Log_Level"]
+        self.sconf = SparkConf()
+        self.addtionalconfig = config["ExtraConfig"]
+    def addConf(self, key, value):
+        """
+        @Function: Adding configuration as runtime
+        """
+        self.sconf.set(key, value)
+        
+    def getAllConf(self):
+        self.logger.debug("*********** ALL CONF *** " + str(self.sconf.getAll()))
+    
+    def getSession(self):
+        """
+        @Function: get Spark Session
+        """
+        for key in self.addtionalconfig:
+            self.sconf.set(key, self.addtionalconfig[key])
+        try:
+            if hasattr(self, 'spark'):
+            # pylint: disable=E0203
+                self.spark.stop()
+        # pylint: disable=W0201
+            self.spark = SparkSession.builder.\
+            appName(self.appname).\
+            master(self.master).\
+            config(conf=self.sconf).getOrCreate()
+            self.spark.sparkContext.setLogLevel(self.loglevel)
+            return self.spark
+        except Exception as exp:
+            raise  Exception('Error Building Spark Session').with_traceback(exp.__traceback__)
+    def stop(self):
+        """
+        @Function:Stop Spark Session
+        """
+        if hasattr(self, 'spark') and self.spark is not None:
+            self.spark.stop()