From: josephthaliath Date: Tue, 18 Oct 2022 09:12:22 +0000 (+0530) Subject: Feature engineering pipeline changes X-Git-Tag: 1.0.0~13 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=e6ca5ab02ddfe6209a7e6a17f15d6de8719bbb15;p=aiml-fw%2Fathp%2Fdata-extraction.git Feature engineering pipeline changes Issue-Id: AIMLFW-2 Signed-off-by: josephthaliath Change-Id: I01111d4d65a72ef9b95b4c801d11f3f3a6122ebe --- diff --git a/dataextraction/FeatureEngineeringFactory.py b/dataextraction/FeatureEngineeringFactory.py new file mode 100644 index 0000000..094d5e3 --- /dev/null +++ b/dataextraction/FeatureEngineeringFactory.py @@ -0,0 +1,118 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" + @Module: Factory to create and return instances of Pipeline +""" +import importlib +# pylint: disable=E0611 +from lru import LRU +from ConfigHelper import ConfigHelper +from Pipeline import Pipeline + +class FeatureEngineeringFactory(): + """ + @Class: Factory to create and return instances of Pipeline + """ + def __init__(self, spark_manager): + """ + @ Constructor + @ Input: spark_manager + """ + self.config_help = ConfigHelper() + self.logger = self.config_help.getLogger() + self.modulename = "" + self.default_class = "" + self.hash_table = LRU(10) + self.logger.debug("************Starting Init FeatureEngineeringFactory***********") + try: + self.spark_session = spark_manager + except Exception as exc: + raise Exception('Spark session Error').with_traceback(exc.__traceback__) + def create_instance(self, baseclass_name, my_classorflavour, inputs_to_class): + """ + @Function: Makes Instances + @ Input : BaseClass Name & ClassName/FlavourName & Inputs to Class + @Output : Class Instance + """ + self.logger.info("BaseClassName "+ baseclass_name + " Class Flavor "+ my_classorflavour) + default = self.config_help.isDefault(baseclass_name, my_classorflavour) + self.modulename = self.config_help.getModuleName(baseclass_name) + self.default_class = self.config_help.getDefaultClassName(baseclass_name) + self.logger.debug("Instantiation of module " + self.modulename) + self.logger.debug("Default Class " + self.default_class) + self.logger.debug("Class OR Flavour " + my_classorflavour) + if not default: + import_class = self.modulename+"."+ my_classorflavour + self.logger.debug("Class to be imported " + import_class) + my_class = getattr(importlib.import_module(import_class), my_classorflavour) + my_classinstance = my_class(my_classorflavour) + elif default: + self.logger.debug("Module Name "+self.modulename) + self.logger.debug("default Class Name "+self.default_class) + my_class = getattr(importlib.import_module(self.modulename+"."+self.default_class), self.default_class) + my_classinstance = my_class(my_classorflavour) + self.logger.debug("Initialization of Class") + my_classinstance.init(self.spark_session, self.config_help, inputs_to_class) + return my_classinstance + + + def __makebulk(self, baseclass_name, class_dictionary): + """ + @Function: Makes Bulk Instances + @ Input : BaseClass Name & Dictionay of Classess + @Output : List of outputs + """ + my_instancelist = [] + self.modulename = self.config_help.getModuleName(baseclass_name) + self.default_class = self.config_help.getDefaultClassName(baseclass_name) + if baseclass_name != "Transform": + for my_class in class_dictionary: + my_instancelist.append(self.create_instance(baseclass_name, my_class, class_dictionary[my_class])) + self.logger.debug("Created instance for Source/Sink..") + elif baseclass_name == "Transform": + for value in class_dictionary: + my_class = value["operation"] + inputdict = value + inputdict.pop("operation") + self.logger.debug("Instanciating Base Class "+baseclass_name+" My Class "+my_class) + my_instancelist.append(self.create_instance(baseclass_name, my_class, inputdict)) + return my_instancelist + + + def getBatchPipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key): + """ + @Function: Makes to get Batch Pipeline + @ Input : source Classess, Sink Classess, Transform Classess + @Output : Instance of Pipeline Object + """ + if self.hash_table.get(caching_key) is None: + self.logger.debug("Cached Instance Not Found, Creating Instance, Key" + caching_key) + source_instancelist = None + transformer_instancelist = None + sink_instancelist = None + + source_instancelist = self.__makebulk("Source", source_classdict) + + if transform_classdict is not None: + transformer_instancelist = self.__makebulk("Transform", transform_classdict) + if sink_classdict is not None: + sink_instancelist = self.__makebulk("Sink", sink_classdict) + + self.hash_table[caching_key] = Pipeline(source_instancelist, transformer_instancelist, sink_instancelist) + return self.hash_table[caching_key] diff --git a/dataextraction/Pipeline.py b/dataextraction/Pipeline.py new file mode 100644 index 0000000..c7bdeaa --- /dev/null +++ b/dataextraction/Pipeline.py @@ -0,0 +1,67 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +@Module : Feature Engineering Pipeline +""" +from ConfigHelper import ConfigHelper + +class Pipeline(): + """ + @Class: Feature Engineering Pipeline + """ + def __init__(self, sources, transformers, sinks): + """ + @Constructor + """ + self.logger = ConfigHelper().logger + self.sources = sources + self.transformers = transformers + self.sinks = sinks + self.logger.debug("Pipeline Created") + self.spark_dflist = None + self.transformed_df = None + def loadData(self, session): + """ + @Function: Loads data from source + """ + self.logger.info("Source:" + str(self.sources[0])) + self.spark_dflist = self.sources[0].load(session) + self.logger.info("Data Load Completed") + def transformData(self, session): + """ + @Function : Transform Data + """ + if self.transformers is None: + self.transformed_df = self.spark_dflist + else: + self.transformed_df = self.transformers[0].transform(session, self.spark_dflist) + self.logger.info("Data Transform Completed") + def writeData(self, session): + """ + @Function: Write Data + """ + self.sinks[0].write(session, self.transformed_df) + self.logger.info("Data Written to Sink") + def execute(self, session): + """ + @Function : Execute Pipeline + """ + self.loadData(session) + self.transformData(session) + self.writeData(session) diff --git a/dataextraction/SparkConfig.ini b/dataextraction/SparkConfig.ini new file mode 100644 index 0000000..7ee005b --- /dev/null +++ b/dataextraction/SparkConfig.ini @@ -0,0 +1,6 @@ +[BaseConfig] +DefaultAppName=Pipeline +DefaultMaster=local[1] +Override_Log_Level=ERROR +[ExtraConfig] +spark.logConf=true diff --git a/dataextraction/SparkHelper.py b/dataextraction/SparkHelper.py new file mode 100644 index 0000000..30676c9 --- /dev/null +++ b/dataextraction/SparkHelper.py @@ -0,0 +1,84 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +@Module:Spark Session Manager +""" +import configparser +from pyspark import SparkConf +from pyspark.sql import SparkSession +from ConfigHelper import ConfigHelper + +class sparkSessionManager(): + """ + @Module:Spark Session Manager + """ + def __init__(self): + """ + @Constructor:Spark Session Manager + """ + config = configparser.ConfigParser() + confighelp = ConfigHelper() + self.logger = confighelp.getLogger() + self.loglevel = confighelp.log_level + code_path = confighelp.exec_path + if code_path is None: + config.read("SparkConfig.ini") + else: + config.read(code_path + "/SparkConfig.ini") + base = config['BaseConfig'] + self.appname = base["DefaultAppName"] + self.master = base["DefaultMaster"] + if base["Override_Log_Level"] is not None: + self.loglevel = base["Override_Log_Level"] + self.sconf = SparkConf() + self.addtionalconfig = config["ExtraConfig"] + def addConf(self, key, value): + """ + @Function: Adding configuration as runtime + """ + self.sconf.set(key, value) + + def getAllConf(self): + self.logger.debug("*********** ALL CONF *** " + str(self.sconf.getAll())) + + def getSession(self): + """ + @Function: get Spark Session + """ + for key in self.addtionalconfig: + self.sconf.set(key, self.addtionalconfig[key]) + try: + if hasattr(self, 'spark'): + # pylint: disable=E0203 + self.spark.stop() + # pylint: disable=W0201 + self.spark = SparkSession.builder.\ + appName(self.appname).\ + master(self.master).\ + config(conf=self.sconf).getOrCreate() + self.spark.sparkContext.setLogLevel(self.loglevel) + return self.spark + except Exception as exp: + raise Exception('Error Building Spark Session').with_traceback(exp.__traceback__) + def stop(self): + """ + @Function:Stop Spark Session + """ + if hasattr(self, 'spark') and self.spark is not None: + self.spark.stop()