From: ashishj1729 Date: Mon, 12 Dec 2022 08:32:40 +0000 (+0530) Subject: Resolving Code Smells X-Git-Tag: 1.0.0^0 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=889b0feb29234b85a96392aff531cde326205a67;p=aiml-fw%2Fathp%2Fdata-extraction.git Resolving Code Smells Issue-Id: AIMLFW-6 Signed-off-by: ashishj1729 Change-Id: I91ac6cbaeca5c7fb9cf78b96965d45c935b19f40 --- diff --git a/dataextraction/ConfigHelper.py b/dataextraction/ConfigHelper.py index 3f408ba..2f1e2a5 100644 --- a/dataextraction/ConfigHelper.py +++ b/dataextraction/ConfigHelper.py @@ -24,6 +24,7 @@ import collections import os import re from tmgr_logger import TMLogger +from exceptions_util import DataExtractionException class Singleton(type): @@ -57,7 +58,7 @@ class ConfigHelper(metaclass=Singleton): self.logger = self.tm_logger.logger - self.log_level = self.tm_logger.LogLevel + self.log_level = self.tm_logger.log_level self.logger.debug("+++++++++++++++Initializaing Config Helper +++++++++++++++++++++++") self.logger.debug(str(self.exec_path)) self.fs_dict ={} @@ -95,7 +96,7 @@ class ConfigHelper(metaclass=Singleton): self.logger.debug("Getting Env Variables for: " + baseclassname) value=os.getenv(self.envconfig_file[key]) if value is None: - raise Exception("Error Environment Variable Not Set"+self.envconfig_file[key] ) + raise DataExtractionException("Error Environment Variable Not Set"+self.envconfig_file[key] ) self.envconfig[key]=value self.logger.debug("Read Environment Config var: "+self.envconfig_file[key]) except Exception as exc: @@ -120,9 +121,9 @@ class ConfigHelper(metaclass=Singleton): """ try: classflavour = "Error" - if my_classinstance.ClassType == "Default": + if my_classinstance.class_type == "Default": classflavour = my_classinstance.flavour - elif my_classinstance.ClassType == "Custom": + elif my_classinstance.class_type == "Custom": classflavour = my_classinstance.__class__.__name__ baseclass=my_classinstance.__class__.__base__.__name__ self.logger.debug("baseclass is Set to "+baseclass) @@ -132,8 +133,8 @@ class ConfigHelper(metaclass=Singleton): self.logger.debug("Source basic properties:" + str(source_props)) source_props_dict = self.__InjectEnvValues(source_props) - if ((source_props_dict["ClassType"] == "Default" or - source_props_dict["ClassType"] == "Custom" ) + if ((source_props_dict["class_type"] == "Default" or + source_props_dict["class_type"] == "Custom" ) and baseclass != "Transform" and classflavour != 'InfluxSource'): injectedconfigsparkconf= self.\ __InjectEnvValues(configsource[classflavour+".SparkConf"]) @@ -154,7 +155,7 @@ class ConfigHelper(metaclass=Singleton): try: config = self.getConfigSource(baseclassname) test= config[my_classname] - if test["ClassType"] == "Default": + if test["class_type"] == "Default": return True return False except Exception as exp: @@ -202,14 +203,14 @@ class ConfigHelper(metaclass=Singleton): try: for key in configdict: string = configdict[key] - matchlist=re.findall(r'\$ENV\{[a-zA-Z_0-9]*\}',string) + matchlist=re.findall(r'\$ENV\{\w*\}',string) # \w stands for [a-zA-Z_0-9] self.logger.debug("Injecting Env Values for,key-value " + key + " " + string ) for replacestr in matchlist: envkey= replacestr[5:-1] envkey =envkey.lower() replacevalue = self.envconfig[envkey] if replacevalue is None or not isinstance(replacevalue,str): - raise Exception("environment variable Not found"\ + raise DataExtractionException("environment variable Not found"\ +self.envconfig_file[envkey]) string = string.replace(replacestr,replacevalue) configdict[key] = string diff --git a/dataextraction/FeatureEngineeringFactory.py b/dataextraction/FeatureEngineeringFactory.py index 094d5e3..06dcbcb 100644 --- a/dataextraction/FeatureEngineeringFactory.py +++ b/dataextraction/FeatureEngineeringFactory.py @@ -62,7 +62,7 @@ class FeatureEngineeringFactory(): self.logger.debug("Class to be imported " + import_class) my_class = getattr(importlib.import_module(import_class), my_classorflavour) my_classinstance = my_class(my_classorflavour) - elif default: + else: self.logger.debug("Module Name "+self.modulename) self.logger.debug("default Class Name "+self.default_class) my_class = getattr(importlib.import_module(self.modulename+"."+self.default_class), self.default_class) @@ -95,7 +95,7 @@ class FeatureEngineeringFactory(): return my_instancelist - def getBatchPipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key): + def get_batch_pipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key): """ @Function: Makes to get Batch Pipeline @ Input : source Classess, Sink Classess, Transform Classess diff --git a/dataextraction/Pipeline.py b/dataextraction/Pipeline.py index c7bdeaa..4acfff9 100644 --- a/dataextraction/Pipeline.py +++ b/dataextraction/Pipeline.py @@ -36,14 +36,14 @@ class Pipeline(): self.logger.debug("Pipeline Created") self.spark_dflist = None self.transformed_df = None - def loadData(self, session): + def load_data(self, session): """ @Function: Loads data from source """ self.logger.info("Source:" + str(self.sources[0])) self.spark_dflist = self.sources[0].load(session) self.logger.info("Data Load Completed") - def transformData(self, session): + def transform_data(self, session): """ @Function : Transform Data """ @@ -52,7 +52,7 @@ class Pipeline(): else: self.transformed_df = self.transformers[0].transform(session, self.spark_dflist) self.logger.info("Data Transform Completed") - def writeData(self, session): + def write_data(self, session): """ @Function: Write Data """ @@ -62,6 +62,6 @@ class Pipeline(): """ @Function : Execute Pipeline """ - self.loadData(session) - self.transformData(session) - self.writeData(session) + self.load_data(session) + self.transform_data(session) + self.write_data(session) diff --git a/dataextraction/SparkHelper.py b/dataextraction/SparkHelper.py index 30676c9..f649e6e 100644 --- a/dataextraction/SparkHelper.py +++ b/dataextraction/SparkHelper.py @@ -24,7 +24,7 @@ from pyspark import SparkConf from pyspark.sql import SparkSession from ConfigHelper import ConfigHelper -class sparkSessionManager(): +class SparkSessionManager(): """ @Module:Spark Session Manager """ @@ -48,16 +48,16 @@ class sparkSessionManager(): self.loglevel = base["Override_Log_Level"] self.sconf = SparkConf() self.addtionalconfig = config["ExtraConfig"] - def addConf(self, key, value): + def add_conf(self, key, value): """ @Function: Adding configuration as runtime """ self.sconf.set(key, value) - def getAllConf(self): + def get_all_conf(self): self.logger.debug("*********** ALL CONF *** " + str(self.sconf.getAll())) - def getSession(self): + def get_session(self): """ @Function: get Spark Session """ diff --git a/dataextraction/exceptions_util.py b/dataextraction/exceptions_util.py new file mode 100644 index 0000000..cc42714 --- /dev/null +++ b/dataextraction/exceptions_util.py @@ -0,0 +1,46 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +class DataExtractionException(Exception): + """ + A class used to represent an Data Extraction exception + + Attributes + ---------- + message : str + a formatted string to print out what is exception + code : int + http status code + """ + + def __init__(self, code, message="exception occured"): + """ + Parameters + ---------- + message : str + a formatted string to print out what is exception + code : int + http statuse code + + """ + + self.code = code + self.message = message + super().__init__(self.message) + + diff --git a/dataextraction/main.py b/dataextraction/main.py index 20932cf..915d1d6 100644 --- a/dataextraction/main.py +++ b/dataextraction/main.py @@ -31,19 +31,20 @@ import jsonpickle from flask import Flask from flask_restful import request from flask_api import status -from SparkHelper import sparkSessionManager +from SparkHelper import SparkSessionManager from ConfigHelper import ConfigHelper from FeatureEngineeringFactory import FeatureEngineeringFactory fsConf = ConfigHelper() logger = fsConf.getLogger() -session_helper = sparkSessionManager() +session_helper = SparkSessionManager() factory = FeatureEngineeringFactory(session_helper) tasks = queue.Queue() task_map = {} infinte_loop_config={"infinte_run":"True", "unit_test_mode":"False"} +default_mime_type = 'application/json' -class task(): +class Task(): """ Task Class """ @@ -84,7 +85,7 @@ def post_handle(): api_result_msg = "/task-status/"+task_id logger.debug("Generated ID"+task_id) tasks.put(task_id) - task_map[task_id] = task(request_json ,"Accepted") + task_map[task_id] = Task(request_json ,"Accepted") logger.debug("Generated ID"+task_id) except Exception as exc: api_result_msg = str(exc) @@ -96,7 +97,7 @@ def post_handle(): dumps(\ { "trainingjob_name":request_json["sink"]["CassandraSink"]["CollectionName"],\ "result" : api_result_msg }),\ - status= response_code,mimetype='application/json') + status= response_code,mimetype=default_mime_type) end_time = datetime.datetime.now() logger.info(str(end_time-start_time)+' API call finished') return response @@ -119,7 +120,7 @@ def get_task_status(task_id): response = app.response_class(response=json.dumps( { "task_status":taskstatus,"result" : api_result_msg }), - status= response_code,mimetype='application/json') + status= response_code,mimetype=default_mime_type) return response @app.route('/task-statuses', methods=['GET']) def get_task_statuses(): @@ -134,7 +135,7 @@ def get_task_statuses(): response = str(exc) response = app.response_class(response, - status= response_code,mimetype='application/json') + status= response_code,mimetype=default_mime_type) return response @app.route('/delete-task-status/', methods=['DELETE']) def delete_task_status(task_id): @@ -149,7 +150,7 @@ def delete_task_status(task_id): except Exception as exc: response_code = status.HTTP_500_INTERNAL_SERVER_ERROR api_result_msg = str(exc) - response = app.response_class(response=json.dumps({ "trainingjob_name": task_id,"result" : api_result_msg }), status= response_code,mimetype='application/json') + response = app.response_class(response=json.dumps({ "trainingjob_name": task_id,"result" : api_result_msg }), status= response_code,mimetype = default_mime_type) return response def async_code_worker(): @@ -160,7 +161,7 @@ def async_code_worker(): while infinte_loop_config["infinte_run"] == "True": try: start_time = datetime.datetime.now() - logger.debug(str(start_time) +"Feature Engineering Pipeline Started") + logger.debug(str(start_time) +"Feature Engineering Pipeline Started |-> tESTING IN Progress V2") task_id = tasks.get() request_json = task_map[task_id].task task_map[task_id].status = "In Progress" @@ -169,16 +170,16 @@ def async_code_worker(): sink_dict = request_json["sink"] c_key = str(source_dict)+str(transform_dict)+str(sink_dict) logger.debug(c_key) - feature_engineering_pipeline = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, c_key) - session = session_helper.getSession() - feature_engineering_pipeline.loadData(session) - feature_engineering_pipeline.transformData(session) - feature_engineering_pipeline.writeData(session) + feature_engineering_pipeline = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, c_key) + session = session_helper.get_session() + feature_engineering_pipeline.load_data(session) + feature_engineering_pipeline.transform_data(session) + feature_engineering_pipeline.write_data(session) session_helper.stop() task_map[task_id].status = "Completed" tasks.task_done() end_time = datetime.datetime.now() - logger.debug(str(end_time) +"Feature Engineering Pipline Ended") + logger.debug(str(end_time) +"Feature Engineering Pipline Ended |-> tESTING IN Progress V2") except Exception as exc: session_helper.stop() traceback.print_exc() diff --git a/dataextraction/sink/Base.py b/dataextraction/sink/Base.py index bb627e7..01ba420 100644 --- a/dataextraction/sink/Base.py +++ b/dataextraction/sink/Base.py @@ -28,7 +28,7 @@ class Sink(): """ @Method": No Args Constructor """ - self.ClassType="Custom" + self.class_type="Custom" @abstractmethod def init(self, sparkhelper, confighelper, inputdict): """ diff --git a/dataextraction/sink/CassandraSink.py b/dataextraction/sink/CassandraSink.py index 736916e..1393eed 100644 --- a/dataextraction/sink/CassandraSink.py +++ b/dataextraction/sink/CassandraSink.py @@ -43,11 +43,9 @@ class CassandraSink(Sink): @Methond: Constructor @Input : classflavor """ - self.ClassType = "Custom" + self.class_type = "Custom" self.flavour = classflavour self.logger = None - classconfig = None - sparkconfig = None self.sparkloadkey = None self.writetype = None self.tableName = None @@ -68,13 +66,13 @@ class CassandraSink(Sink): self.tableName = inputdict["CollectionName"] classconfig, sparkconfig = confighelper.getClassConfig(self) - envConf = confighelper.getEnvConfig() + env_conf = confighelper.getEnvConfig() - self.keyspace = envConf["cassandra_sinkdb"] - self.host = envConf["fs_db_ip"] - self.port = envConf["fs_db_port"] - self.user = envConf["fs_db_user"] - self.passw = envConf["fs_db_password"] + self.keyspace = env_conf["cassandra_sinkdb"] + self.host = env_conf["fs_db_ip"] + self.port = env_conf["fs_db_port"] + self.user = env_conf["fs_db_user"] + self.passw = env_conf["fs_db_password"] self.sparkloadkey = classconfig["SparkLoadKey"] self.writetype = classconfig["WriteMode"] @@ -88,9 +86,9 @@ class CassandraSink(Sink): print("Spark Config", key, sparkconfig[key]) if value.startswith("$Input$"): inputkey = value[7:] - sparkhelper.addConf(key, inputdict[inputkey]) + sparkhelper.add_conf(key, inputdict[inputkey]) else: - sparkhelper.addConf(key, value) + sparkhelper.add_conf(key, value) def write(self, sparksession, sparkdf): """ @@ -140,7 +138,7 @@ class CassandraSink(Sink): session = cluster.connect(self.keyspace) - session.execute(self.buildDeleteTable(sparkdf)) + session.execute(self.buildDeleteTable()) query = self.buildCreateTable(sparkdf) session.execute(query) @@ -165,7 +163,7 @@ class CassandraSink(Sink): self.logger.debug("Create table query " + query) return query - def buildDeleteTable(self, sparkdf): + def buildDeleteTable(self): """ Builds simple cassandra query for deleting table """ @@ -175,7 +173,7 @@ class CassandraSink(Sink): def buildKeyspaceQuery(self, keyspace): """ - Builds cassandra query for creating keyspace + Builds cassandra query for creating keyspace 1 """ query = ( "CREATE KEYSPACE IF NOT EXISTS " diff --git a/dataextraction/sink/DefaultSparkSink.py b/dataextraction/sink/DefaultSparkSink.py index a66b6ee..88b552e 100644 --- a/dataextraction/sink/DefaultSparkSink.py +++ b/dataextraction/sink/DefaultSparkSink.py @@ -29,11 +29,9 @@ class DefaultSparkSink(Sink): @Methond: Constructor @Input : classflavor """ - self.ClassType="Default" + self.class_type="Default" self.flavour = classflavour self.logger = None - classconfig = None - sparkconfig = None self.sparkloadkey = None self.writetype = None def init(self, sparkhelper, confighelper, inputdict): @@ -52,9 +50,9 @@ class DefaultSparkSink(Sink): print("Spark Config",key,sparkconfig[key]) if value.startswith('$Input$'): inputkey = value[7:] - sparkhelper.addConf(key,inputdict[inputkey]) + sparkhelper.add_conf(key,inputdict[inputkey]) else: - sparkhelper.addConf(key,value) + sparkhelper.add_conf(key,value) def write(self, sparksession, sparkdf): """ @Method: write diff --git a/dataextraction/sink/SinkClassConfig.ini b/dataextraction/sink/SinkClassConfig.ini index a00ee13..5e8f7c4 100644 --- a/dataextraction/sink/SinkClassConfig.ini +++ b/dataextraction/sink/SinkClassConfig.ini @@ -24,7 +24,7 @@ DefaultClassName=DefaultSparkSink [CassandraSink] Name=CassandraDB Description= Spark connector that Supports Writing from CassandraDB -ClassType=Custom +class_type=Custom SparkLoadKey=org.apache.spark.sql.cassandra WriteMode=overwrite diff --git a/dataextraction/source/Base.py b/dataextraction/source/Base.py index 5249c3b..e46cd94 100644 --- a/dataextraction/source/Base.py +++ b/dataextraction/source/Base.py @@ -28,7 +28,7 @@ class Source(): """ @Constructor No Args """ - self.ClassType="Custom" + self.class_type="Custom" @abstractmethod def init(self,sparkhelper,confighelper,inputdict): """ diff --git a/dataextraction/source/CassandraSource.py b/dataextraction/source/CassandraSource.py index 7911011..f2fc8a9 100644 --- a/dataextraction/source/CassandraSource.py +++ b/dataextraction/source/CassandraSource.py @@ -31,7 +31,7 @@ class CassandraSource(Source): """ @Method: Single Arg Constructor """ - self.ClassType="Custom" + self.class_type="Custom" self.flavour=classflavour self.logger=None self.sparkloadkey=None @@ -45,12 +45,10 @@ class CassandraSource(Source): self.logger = confighelper.getLogger() self.logger.debug("Set Cassandra Class Spark Source Flavor: " + self.flavour) classconfig,sparkconfig = confighelper.getClassConfig(self) - envConf = confighelper.getEnvConfig() - - + env_conf = confighelper.getEnvConfig() self.sparkloadkey = classconfig["SparkLoadKey"] - self.keyspace = envConf['cassandra_sourcedb'] - self.table = envConf['cassandra_sourcetable'] + self.keyspace = env_conf['cassandra_sourcedb'] + self.table = env_conf['cassandra_sourcetable'] self.logger.debug("Source keyspace:" + str(self.keyspace) + " table-" + str(self.table)) @@ -59,9 +57,9 @@ class CassandraSource(Source): value = sparkconfig[key] if value.startswith('$Input$'): inputkey = value[7:] - sparkhelper.addConf(key,inputdict[inputkey]) + sparkhelper.add_conf(key,inputdict[inputkey]) else: - sparkhelper.addConf(key,value) + sparkhelper.add_conf(key,value) self.logger.debug("Spark cassandra source initialized as" + self.flavour) diff --git a/dataextraction/source/DefaultSparkSource.py b/dataextraction/source/DefaultSparkSource.py index 722942a..fdec526 100644 --- a/dataextraction/source/DefaultSparkSource.py +++ b/dataextraction/source/DefaultSparkSource.py @@ -29,7 +29,7 @@ class DefaultSparkSource(Source): """ @Method: Single Arg Constructor """ - self.ClassType="Default" + self.class_type="Default" self.flavour=classflavour self.logger=None self.sparkloadkey=None @@ -47,9 +47,9 @@ class DefaultSparkSource(Source): value = sparkconfig[key] if value.startswith('$Input$'): inputkey = value[7:] - sparkhelper.addConf(key,inputdict[inputkey]) + sparkhelper.add_conf(key,inputdict[inputkey]) else: - sparkhelper.addConf(key,value) + sparkhelper.add_conf(key,value) self.logger.debug("Spark Default Source Initialized as"+self.flavour) def load(self,sparksession): """ diff --git a/dataextraction/source/InfluxSource.py b/dataextraction/source/InfluxSource.py index 4103d53..de04a2f 100644 --- a/dataextraction/source/InfluxSource.py +++ b/dataextraction/source/InfluxSource.py @@ -32,7 +32,7 @@ class InfluxSource(Source): """ @Method: Single Arg Constructor """ - self.ClassType="Custom" + self.class_type="Custom" self.flavour=classflavour self.logger = None self.classconfig = None diff --git a/dataextraction/source/SourceClassConfig.ini b/dataextraction/source/SourceClassConfig.ini index a5ec5db..f8af9ed 100644 --- a/dataextraction/source/SourceClassConfig.ini +++ b/dataextraction/source/SourceClassConfig.ini @@ -18,7 +18,7 @@ [InfluxSource] Name=Influxdb Description= Spark connector that Supports Reading from InfluxDB -ClassType=Custom +class_type=Custom url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort} token = $ENV{Influx_Token} org = $ENV{Influx_DBHeirarchyL1} @@ -32,7 +32,7 @@ query=this query for influx db [CassandraSource] Name=Cassandradb Description= Spark connector that Supports Reading from CassandraDB -ClassType=Custom +class_type=Custom SparkLoadKey=org.apache.spark.sql.cassandra [CassandraSource.SparkConf] diff --git a/dataextraction/tmgr_logger.py b/dataextraction/tmgr_logger.py index fa464d0..93cb199 100644 --- a/dataextraction/tmgr_logger.py +++ b/dataextraction/tmgr_logger.py @@ -40,7 +40,7 @@ class TMLogger(object):# pylint: disable=too-few-public-methods with open(conf_file, 'r') as file: log_config = yaml.safe_load(file.read()) logging.config.dictConfig(log_config) - self.LogLevel = log_config["root"]["level"] + self.log_level = log_config["root"]["level"] self.logger = logging.getLogger(__name__) except FileNotFoundError as err: print("error opening yaml config file") @@ -58,6 +58,6 @@ class TMLogger(object):# pylint: disable=too-few-public-methods return self.logger @property - def get_logLevel(self): - return self.LogLevel + def get_log_level(self): + return self.log_level diff --git a/dataextraction/transform/Base.py b/dataextraction/transform/Base.py index 2243dfc..ac1c4c3 100644 --- a/dataextraction/transform/Base.py +++ b/dataextraction/transform/Base.py @@ -29,7 +29,7 @@ class Transform(): """ @Method: No Args Constructor """ - self.ClassType="Custom" + self.class_type="Custom" @abstractmethod def init(self, sparkhelper, confighelper, inputdict): """ diff --git a/dataextraction/transform/DefaultSparkTransform.py b/dataextraction/transform/DefaultSparkTransform.py index 0d0c927..408d5f2 100644 --- a/dataextraction/transform/DefaultSparkTransform.py +++ b/dataextraction/transform/DefaultSparkTransform.py @@ -28,7 +28,7 @@ class DefaultSparkTransform(Transform): """ @Method:constructor """ - self.ClassType="Default" + self.class_type="Default" self.flavour=classflavour def init(self,sparkhelper, confighelper,inputdict): """ diff --git a/dataextraction/transform/TransformClassConfig.ini b/dataextraction/transform/TransformClassConfig.ini index e54c49b..abbd770 100644 --- a/dataextraction/transform/TransformClassConfig.ini +++ b/dataextraction/transform/TransformClassConfig.ini @@ -18,7 +18,7 @@ [SQLTransform] Name=SQLTransformer Description= SQLTransformer Transforms your data using SQL statements -ClassType=Custom +class_type=Custom [ModuleDetails] ModuleName=transform diff --git a/test/sink/SinkClassConfig.ini b/test/sink/SinkClassConfig.ini index a00ee13..5e8f7c4 100644 --- a/test/sink/SinkClassConfig.ini +++ b/test/sink/SinkClassConfig.ini @@ -24,7 +24,7 @@ DefaultClassName=DefaultSparkSink [CassandraSink] Name=CassandraDB Description= Spark connector that Supports Writing from CassandraDB -ClassType=Custom +class_type=Custom SparkLoadKey=org.apache.spark.sql.cassandra WriteMode=overwrite diff --git a/test/source/SourceClassConfig.ini b/test/source/SourceClassConfig.ini index 15ff6b5..e5b83c8 100644 --- a/test/source/SourceClassConfig.ini +++ b/test/source/SourceClassConfig.ini @@ -19,7 +19,7 @@ [InfluxSource] Name=Influxdb Description= Spark connector that Supports Reading from InfluxDB -ClassType=Custom +class_type=Custom url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort} token = $ENV{Influx_Token} org = $ENV{Influx_DBHeirarchyL1} @@ -33,7 +33,7 @@ query=this query for influx db [CassandraSource] Name=Cassandradb Description= Spark connector that Supports Reading from CassandraDB -ClassType=Custom +class_type=Custom SparkLoadKey=org.apache.spark.sql.cassandra [CassandraSource.SparkConf] diff --git a/test/test_DefaultSparkSource.py b/test/test_DefaultSparkSource.py index e1a1b8e..483caad 100644 --- a/test/test_DefaultSparkSource.py +++ b/test/test_DefaultSparkSource.py @@ -35,7 +35,7 @@ class Spark_helper: self.test_result = False self.read = helper2() - def addConf(self, key, value): + def add_conf(self, key, value): self.test_result = True def load(self): diff --git a/test/test_cassandrasink.py b/test/test_cassandrasink.py index 9349d47..17e6cad 100644 --- a/test/test_cassandrasink.py +++ b/test/test_cassandrasink.py @@ -32,7 +32,7 @@ class sparkSessionHelper(): def __init__(self): pass - def addConf(self, key, value): + def add_conf(self, key, value): pass class df_helper(): diff --git a/test/test_main.py b/test/test_main.py index a2499f5..d003093 100644 --- a/test/test_main.py +++ b/test/test_main.py @@ -29,9 +29,9 @@ sys.path.extend(["dataextraction/"]) # In order to import dataextraction functio import main -from SparkHelper import sparkSessionManager +from SparkHelper import SparkSessionManager from Pipeline import Pipeline -from main import task +from main import Task @@ -66,7 +66,7 @@ class Test_task_status: def test_task_status(self): request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} - main.task_map["unittest_task"] = task(request_json ,"Accepted") + main.task_map["unittest_task"] = Task(request_json ,"Accepted") response = self.client.get("/task-status/unittest_task") main.task_map.clear() assert response.content_type == 'application/json' @@ -75,7 +75,7 @@ class Test_task_status: #Error test def test_negative_task_status(self): request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} - main.task_map["unittest_task"] = task(request_json ,"Error") + main.task_map["unittest_task"] = Task(request_json ,"Error") response = self.client.get("/task-status/unittest_task") main.task_map.clear() @@ -96,7 +96,7 @@ class Test_delete_task_status: def test_delete_task_status(self): request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} - main.task_map["unittest_task"] = task(request_json ,"Accepted") + main.task_map["unittest_task"] = Task(request_json ,"Accepted") response = self.client.delete("/delete-task-status/unittest_task") main.task_map.clear() assert response.content_type == 'application/json' @@ -112,8 +112,8 @@ class Test_async_code_worker: def setup_method(self): self.client = main.app.test_client(self) - @patch('main.session_helper.getSession') - @patch('main.factory.getBatchPipeline') + @patch('main.session_helper.get_session') + @patch('main.factory.get_batch_pipeline') def test_negative_async_code_worker_1(self,mock1,mock2): main.infinte_loop_config["infinte_run"]= "True" diff --git a/test/test_pipeline.py b/test/test_pipeline.py index 62f0d10..e48e5b7 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -25,7 +25,7 @@ sys.path.extend(["dataextraction/"]) from Pipeline import Pipeline from FeatureEngineeringFactory import FeatureEngineeringFactory -from SparkHelper import sparkSessionManager +from SparkHelper import SparkSessionManager @@ -48,10 +48,10 @@ class Test_Pipeline: api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}} load_dotenv('test/test_env.env') os.environ['CODE_DIR_PATH'] = 'test' - session_helper = sparkSessionManager() + session_helper = SparkSessionManager() factory = FeatureEngineeringFactory(session_helper) (source_dict, transform_dict, sink_dict) = (api_json['source'], api_json['transform'], api_json['sink']) - self.obj = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict)) + self.obj = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict)) self.spark_session = session_helper @@ -60,27 +60,27 @@ class Test_Pipeline: - def test_loadData(self): + def test_load_data(self): assert self.obj != None, 'Pipeline Object Creation, Failed' self.obj.sources[0] = helper() - self.obj.loadData(self.spark_session ) + self.obj.load_data(self.spark_session ) assert self.obj.spark_dflist == 'Data Load Completed', 'Data Load Failed' - def test_transformData(self): + def test_transform_data(self): self.obj.transformers[0] = helper() - self.obj.transformData(self.spark_session) + self.obj.transform_data(self.spark_session) assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed' - def test_transformDataWithNoTransform(self): + def test_transform_data_with_no_transform(self): self.obj.transformers = None self.obj.spark_dflist = 'Data Transform Completed' - self.obj.transformData(self.spark_session) + self.obj.transform_data(self.spark_session) assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed When No Transformer is specified' - def test_writeData(self): + def test_write_data(self): self.obj.sinks[0] = helper() - self.obj.writeData(self.spark_session) + self.obj.write_data(self.spark_session) assert True diff --git a/test/test_tmgr_logger.py b/test/test_tmgr_logger.py index 99501e1..07a1d6f 100644 --- a/test/test_tmgr_logger.py +++ b/test/test_tmgr_logger.py @@ -36,7 +36,7 @@ def test_get_logLevel(): load_dotenv('test/test_env.env') os.environ['CODE_DIR_PATH']='test' tm_logger = TMLogger("test/log_config.yaml") - assert None != tm_logger.get_logLevel + assert None != tm_logger.get_log_level def test_init_withWrongFile(): load_dotenv('test/test_env.env') diff --git a/test/transform/TransformClassConfig.ini b/test/transform/TransformClassConfig.ini index c63fa4b..bbcf30e 100644 --- a/test/transform/TransformClassConfig.ini +++ b/test/transform/TransformClassConfig.ini @@ -19,7 +19,7 @@ [SQLTransform] Name=SQLTransformer Description= SQLTransformer Transforms your data using SQL statements -ClassType=Custom +class_type=Custom [ModuleDetails] ModuleName=transform