Resolving Code Smells 58/10058/1 1.0.0 1.0.1
authorashishj1729 <jain.ashish@samsung.com>
Mon, 12 Dec 2022 08:32:40 +0000 (14:02 +0530)
committerashishj1729 <jain.ashish@samsung.com>
Mon, 12 Dec 2022 08:33:43 +0000 (14:03 +0530)
Issue-Id: AIMLFW-6

Signed-off-by: ashishj1729 <jain.ashish@samsung.com>
Change-Id: I91ac6cbaeca5c7fb9cf78b96965d45c935b19f40

27 files changed:
dataextraction/ConfigHelper.py
dataextraction/FeatureEngineeringFactory.py
dataextraction/Pipeline.py
dataextraction/SparkHelper.py
dataextraction/exceptions_util.py [new file with mode: 0644]
dataextraction/main.py
dataextraction/sink/Base.py
dataextraction/sink/CassandraSink.py
dataextraction/sink/DefaultSparkSink.py
dataextraction/sink/SinkClassConfig.ini
dataextraction/source/Base.py
dataextraction/source/CassandraSource.py
dataextraction/source/DefaultSparkSource.py
dataextraction/source/InfluxSource.py
dataextraction/source/SourceClassConfig.ini
dataextraction/tmgr_logger.py
dataextraction/transform/Base.py
dataextraction/transform/DefaultSparkTransform.py
dataextraction/transform/TransformClassConfig.ini
test/sink/SinkClassConfig.ini
test/source/SourceClassConfig.ini
test/test_DefaultSparkSource.py
test/test_cassandrasink.py
test/test_main.py
test/test_pipeline.py
test/test_tmgr_logger.py
test/transform/TransformClassConfig.ini

index 3f408ba..2f1e2a5 100644 (file)
@@ -24,6 +24,7 @@ import collections
 import os
 import re
 from tmgr_logger import TMLogger
+from exceptions_util import DataExtractionException
 
 
 class Singleton(type):
@@ -57,7 +58,7 @@ class ConfigHelper(metaclass=Singleton):
 
 
             self.logger = self.tm_logger.logger
-            self.log_level = self.tm_logger.LogLevel
+            self.log_level = self.tm_logger.log_level
             self.logger.debug("+++++++++++++++Initializaing Config Helper +++++++++++++++++++++++")
             self.logger.debug(str(self.exec_path))
             self.fs_dict ={}
@@ -95,7 +96,7 @@ class ConfigHelper(metaclass=Singleton):
                 self.logger.debug("Getting Env Variables for: " + baseclassname)
                 value=os.getenv(self.envconfig_file[key])
                 if value is None:
-                    raise Exception("Error Environment Variable Not Set"+self.envconfig_file[key] )
+                    raise DataExtractionException("Error Environment Variable Not Set"+self.envconfig_file[key] )
                 self.envconfig[key]=value
                 self.logger.debug("Read Environment Config var: "+self.envconfig_file[key])
         except Exception as exc:
@@ -120,9 +121,9 @@ class ConfigHelper(metaclass=Singleton):
         """
         try:
             classflavour = "Error"
-            if my_classinstance.ClassType == "Default":
+            if my_classinstance.class_type == "Default":
                 classflavour = my_classinstance.flavour
-            elif my_classinstance.ClassType == "Custom":
+            elif my_classinstance.class_type == "Custom":
                 classflavour = my_classinstance.__class__.__name__
             baseclass=my_classinstance.__class__.__base__.__name__
             self.logger.debug("baseclass is Set to "+baseclass)
@@ -132,8 +133,8 @@ class ConfigHelper(metaclass=Singleton):
             self.logger.debug("Source basic properties:" + str(source_props))
             
             source_props_dict = self.__InjectEnvValues(source_props)
-            if ((source_props_dict["ClassType"] == "Default" or
-                    source_props_dict["ClassType"] == "Custom" ) 
+            if ((source_props_dict["class_type"] == "Default" or
+                    source_props_dict["class_type"] == "Custom" ) 
                     and baseclass != "Transform" and classflavour != 'InfluxSource'):
                 injectedconfigsparkconf= self.\
                 __InjectEnvValues(configsource[classflavour+".SparkConf"])
@@ -154,7 +155,7 @@ class ConfigHelper(metaclass=Singleton):
         try:
             config = self.getConfigSource(baseclassname)
             test= config[my_classname]
-            if test["ClassType"] == "Default":
+            if test["class_type"] == "Default":
                 return True
             return False
         except Exception as exp:
@@ -202,14 +203,14 @@ class ConfigHelper(metaclass=Singleton):
         try:
             for key in configdict:
                 string = configdict[key]
-                matchlist=re.findall(r'\$ENV\{[a-zA-Z_0-9]*\}',string)
+                matchlist=re.findall(r'\$ENV\{\w*\}',string) # \w stands for [a-zA-Z_0-9]
                 self.logger.debug("Injecting Env Values for,key-value " + key + " " + string )
                 for replacestr in matchlist:
                     envkey= replacestr[5:-1]
                     envkey =envkey.lower()
                     replacevalue = self.envconfig[envkey]
                     if replacevalue is None or not isinstance(replacevalue,str):
-                        raise Exception("environment variable Not found"\
+                        raise DataExtractionException("environment variable Not found"\
                                         +self.envconfig_file[envkey])
                     string = string.replace(replacestr,replacevalue)
                 configdict[key] = string
index 094d5e3..06dcbcb 100644 (file)
@@ -62,7 +62,7 @@ class FeatureEngineeringFactory():
             self.logger.debug("Class to be imported " + import_class)
             my_class = getattr(importlib.import_module(import_class), my_classorflavour)
             my_classinstance = my_class(my_classorflavour)
-        elif default:
+        else:
             self.logger.debug("Module Name "+self.modulename)
             self.logger.debug("default Class Name "+self.default_class)
             my_class = getattr(importlib.import_module(self.modulename+"."+self.default_class), self.default_class)
@@ -95,7 +95,7 @@ class FeatureEngineeringFactory():
         return my_instancelist
         
         
-    def getBatchPipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key):
+    def get_batch_pipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key):
         """
         @Function: Makes to get Batch Pipeline
         @ Input  : source Classess, Sink Classess, Transform Classess
index c7bdeaa..4acfff9 100644 (file)
@@ -36,14 +36,14 @@ class Pipeline():
         self.logger.debug("Pipeline Created")
         self.spark_dflist = None
         self.transformed_df = None
-    def loadData(self, session):
+    def load_data(self, session):
         """
         @Function: Loads data from source
         """
         self.logger.info("Source:" + str(self.sources[0]))
         self.spark_dflist = self.sources[0].load(session)
         self.logger.info("Data Load Completed")
-    def transformData(self, session):
+    def transform_data(self, session):
         """
         @Function : Transform Data
         """
@@ -52,7 +52,7 @@ class Pipeline():
         else:
             self.transformed_df = self.transformers[0].transform(session, self.spark_dflist)
         self.logger.info("Data Transform Completed")
-    def writeData(self, session):
+    def write_data(self, session):
         """
         @Function: Write Data
         """
@@ -62,6 +62,6 @@ class Pipeline():
         """
         @Function : Execute Pipeline
         """
-        self.loadData(session)
-        self.transformData(session)
-        self.writeData(session)
+        self.load_data(session)
+        self.transform_data(session)
+        self.write_data(session)
index 30676c9..f649e6e 100644 (file)
@@ -24,7 +24,7 @@ from pyspark import SparkConf
 from pyspark.sql import SparkSession
 from ConfigHelper import ConfigHelper
 
-class sparkSessionManager():
+class SparkSessionManager():
     """
     @Module:Spark Session Manager
     """
@@ -48,16 +48,16 @@ class sparkSessionManager():
             self.loglevel = base["Override_Log_Level"]
         self.sconf = SparkConf()
         self.addtionalconfig = config["ExtraConfig"]
-    def addConf(self, key, value):
+    def add_conf(self, key, value):
         """
         @Function: Adding configuration as runtime
         """
         self.sconf.set(key, value)
         
-    def getAllConf(self):
+    def get_all_conf(self):
         self.logger.debug("*********** ALL CONF *** " + str(self.sconf.getAll()))
     
-    def getSession(self):
+    def get_session(self):
         """
         @Function: get Spark Session
         """
diff --git a/dataextraction/exceptions_util.py b/dataextraction/exceptions_util.py
new file mode 100644 (file)
index 0000000..cc42714
--- /dev/null
@@ -0,0 +1,46 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+class DataExtractionException(Exception):
+    """
+    A class used to represent an Data Extraction exception
+
+    Attributes
+    ----------
+    message : str
+        a formatted string to print out what is exception
+    code : int
+        http status code
+    """
+
+    def __init__(self, code, message="exception occured"):
+        """
+        Parameters
+        ----------
+        message : str
+            a formatted string to print out what is exception
+        code : int
+            http statuse code
+
+        """
+        
+        self.code = code
+        self.message = message
+        super().__init__(self.message)
+
+
index 20932cf..915d1d6 100644 (file)
@@ -31,19 +31,20 @@ import jsonpickle
 from flask import Flask
 from flask_restful import  request
 from flask_api import status
-from SparkHelper import sparkSessionManager
+from SparkHelper import SparkSessionManager
 from ConfigHelper import ConfigHelper
 from FeatureEngineeringFactory import FeatureEngineeringFactory
 
 fsConf = ConfigHelper()
 logger = fsConf.getLogger()
-session_helper = sparkSessionManager()
+session_helper = SparkSessionManager()
 factory = FeatureEngineeringFactory(session_helper)
 tasks = queue.Queue()
 task_map = {}
 infinte_loop_config={"infinte_run":"True", "unit_test_mode":"False"}
+default_mime_type = 'application/json'
 
-class task():
+class Task():
     """
     Task Class
     """
@@ -84,7 +85,7 @@ def post_handle():
         api_result_msg = "/task-status/"+task_id
         logger.debug("Generated ID"+task_id)
         tasks.put(task_id)
-        task_map[task_id] = task(request_json ,"Accepted")
+        task_map[task_id] = Task(request_json ,"Accepted")
         logger.debug("Generated ID"+task_id)
     except Exception as exc:
         api_result_msg = str(exc)
@@ -96,7 +97,7 @@ def post_handle():
             dumps(\
         { "trainingjob_name":request_json["sink"]["CassandraSink"]["CollectionName"],\
         "result" : api_result_msg }),\
-        status= response_code,mimetype='application/json')
+        status= response_code,mimetype=default_mime_type)
     end_time = datetime.datetime.now()
     logger.info(str(end_time-start_time)+' API call finished')
     return response
@@ -119,7 +120,7 @@ def get_task_status(task_id):
 
     response = app.response_class(response=json.dumps(
         { "task_status":taskstatus,"result" : api_result_msg }),
-        status= response_code,mimetype='application/json')
+        status= response_code,mimetype=default_mime_type)
     return response
 @app.route('/task-statuses', methods=['GET'])
 def get_task_statuses():
@@ -134,7 +135,7 @@ def get_task_statuses():
         response = str(exc)
 
     response = app.response_class(response,
-        status= response_code,mimetype='application/json')
+        status= response_code,mimetype=default_mime_type)
     return response
 @app.route('/delete-task-status/<task_id>', methods=['DELETE'])
 def delete_task_status(task_id):
@@ -149,7 +150,7 @@ def delete_task_status(task_id):
     except Exception as exc:
         response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
         api_result_msg = str(exc)
-    response = app.response_class(response=json.dumps({ "trainingjob_name": task_id,"result" : api_result_msg }),        status= response_code,mimetype='application/json')
+    response = app.response_class(response=json.dumps({ "trainingjob_name": task_id,"result" : api_result_msg }), status= response_code,mimetype = default_mime_type)
     return response
     
 def async_code_worker():
@@ -160,7 +161,7 @@ def async_code_worker():
     while infinte_loop_config["infinte_run"] == "True":
         try:
             start_time = datetime.datetime.now()
-            logger.debug(str(start_time) +"Feature Engineering Pipeline Started")
+            logger.debug(str(start_time) +"Feature Engineering Pipeline Started |-> tESTING IN Progress V2")
             task_id = tasks.get()
             request_json = task_map[task_id].task
             task_map[task_id].status = "In Progress"
@@ -169,16 +170,16 @@ def async_code_worker():
             sink_dict = request_json["sink"]
             c_key = str(source_dict)+str(transform_dict)+str(sink_dict)
             logger.debug(c_key)
-            feature_engineering_pipeline = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, c_key)
-            session = session_helper.getSession()
-            feature_engineering_pipeline.loadData(session)
-            feature_engineering_pipeline.transformData(session)
-            feature_engineering_pipeline.writeData(session)
+            feature_engineering_pipeline = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, c_key)
+            session = session_helper.get_session()
+            feature_engineering_pipeline.load_data(session)
+            feature_engineering_pipeline.transform_data(session)
+            feature_engineering_pipeline.write_data(session)
             session_helper.stop()
             task_map[task_id].status = "Completed"
             tasks.task_done()
             end_time = datetime.datetime.now()
-            logger.debug(str(end_time) +"Feature Engineering Pipline Ended")
+            logger.debug(str(end_time) +"Feature Engineering Pipline Ended |-> tESTING IN Progress V2")
         except Exception as exc:
             session_helper.stop()
             traceback.print_exc()
index bb627e7..01ba420 100644 (file)
@@ -28,7 +28,7 @@ class Sink():
         """
         @Method": No Args Constructor
         """
-        self.ClassType="Custom"
+        self.class_type="Custom"
     @abstractmethod
     def init(self, sparkhelper, confighelper, inputdict):
         """
index 736916e..1393eed 100644 (file)
@@ -43,11 +43,9 @@ class CassandraSink(Sink):
         @Methond: Constructor
         @Input : classflavor
         """
-        self.ClassType = "Custom"
+        self.class_type = "Custom"
         self.flavour = classflavour
         self.logger = None
-        classconfig = None
-        sparkconfig = None
         self.sparkloadkey = None
         self.writetype = None
         self.tableName = None
@@ -68,13 +66,13 @@ class CassandraSink(Sink):
 
         self.tableName = inputdict["CollectionName"]
         classconfig, sparkconfig = confighelper.getClassConfig(self)
-        envConf = confighelper.getEnvConfig()
+        env_conf = confighelper.getEnvConfig()
 
-        self.keyspace = envConf["cassandra_sinkdb"]
-        self.host = envConf["fs_db_ip"]
-        self.port = envConf["fs_db_port"]
-        self.user = envConf["fs_db_user"]
-        self.passw = envConf["fs_db_password"]
+        self.keyspace = env_conf["cassandra_sinkdb"]
+        self.host = env_conf["fs_db_ip"]
+        self.port = env_conf["fs_db_port"]
+        self.user = env_conf["fs_db_user"]
+        self.passw = env_conf["fs_db_password"]
 
         self.sparkloadkey = classconfig["SparkLoadKey"]
         self.writetype = classconfig["WriteMode"]
@@ -88,9 +86,9 @@ class CassandraSink(Sink):
             print("Spark Config", key, sparkconfig[key])
             if value.startswith("$Input$"):
                 inputkey = value[7:]
-                sparkhelper.addConf(key, inputdict[inputkey])
+                sparkhelper.add_conf(key, inputdict[inputkey])
             else:
-                sparkhelper.addConf(key, value)
+                sparkhelper.add_conf(key, value)
 
     def write(self, sparksession, sparkdf):
         """
@@ -140,7 +138,7 @@ class CassandraSink(Sink):
         session = cluster.connect(self.keyspace)
         
         
-        session.execute(self.buildDeleteTable(sparkdf))
+        session.execute(self.buildDeleteTable())
         
         query = self.buildCreateTable(sparkdf)
         session.execute(query)
@@ -165,7 +163,7 @@ class CassandraSink(Sink):
         self.logger.debug("Create table query " + query)
         return query
 
-    def buildDeleteTable(self, sparkdf):
+    def buildDeleteTable(self):
         """
         Builds simple cassandra query for deleting table
         """
@@ -175,7 +173,7 @@ class CassandraSink(Sink):
 
     def buildKeyspaceQuery(self, keyspace):
         """
-        Builds cassandra query for creating keyspace
+        Builds cassandra query for creating keyspace 1
         """
         query = (
             "CREATE KEYSPACE IF NOT EXISTS "
index a66b6ee..88b552e 100644 (file)
@@ -29,11 +29,9 @@ class DefaultSparkSink(Sink):
         @Methond: Constructor
         @Input : classflavor
         """
-        self.ClassType="Default"
+        self.class_type="Default"
         self.flavour = classflavour
         self.logger = None
-        classconfig = None
-        sparkconfig  = None
         self.sparkloadkey = None
         self.writetype = None
     def init(self, sparkhelper, confighelper, inputdict):
@@ -52,9 +50,9 @@ class DefaultSparkSink(Sink):
             print("Spark Config",key,sparkconfig[key])
             if value.startswith('$Input$'):
                 inputkey = value[7:]
-                sparkhelper.addConf(key,inputdict[inputkey])
+                sparkhelper.add_conf(key,inputdict[inputkey])
             else:
-                sparkhelper.addConf(key,value)
+                sparkhelper.add_conf(key,value)
     def write(self, sparksession, sparkdf):
         """
         @Method: write
index a00ee13..5e8f7c4 100644 (file)
@@ -24,7 +24,7 @@ DefaultClassName=DefaultSparkSink
 [CassandraSink]
 Name=CassandraDB
 Description= Spark connector that Supports Writing from CassandraDB
-ClassType=Custom
+class_type=Custom
 SparkLoadKey=org.apache.spark.sql.cassandra
 WriteMode=overwrite
 
index 5249c3b..e46cd94 100644 (file)
@@ -28,7 +28,7 @@ class Source():
         """
         @Constructor No Args
         """
-        self.ClassType="Custom"
+        self.class_type="Custom"
     @abstractmethod
     def init(self,sparkhelper,confighelper,inputdict):
         """
index 7911011..f2fc8a9 100644 (file)
@@ -31,7 +31,7 @@ class CassandraSource(Source):
         """
         @Method: Single Arg Constructor
         """
-        self.ClassType="Custom"
+        self.class_type="Custom"
         self.flavour=classflavour
         self.logger=None
         self.sparkloadkey=None
@@ -45,12 +45,10 @@ class CassandraSource(Source):
         self.logger = confighelper.getLogger()
         self.logger.debug("Set Cassandra Class Spark Source Flavor: " + self.flavour)
         classconfig,sparkconfig  = confighelper.getClassConfig(self)
-        envConf = confighelper.getEnvConfig()
-        
-        
+        env_conf = confighelper.getEnvConfig()
         self.sparkloadkey = classconfig["SparkLoadKey"]
-        self.keyspace  = envConf['cassandra_sourcedb']
-        self.table = envConf['cassandra_sourcetable']
+        self.keyspace  = env_conf['cassandra_sourcedb']
+        self.table = env_conf['cassandra_sourcetable']
         
         self.logger.debug("Source keyspace:" +  str(self.keyspace) + " table-" + str(self.table))
         
@@ -59,9 +57,9 @@ class CassandraSource(Source):
             value = sparkconfig[key]
             if value.startswith('$Input$'):
                 inputkey = value[7:]
-                sparkhelper.addConf(key,inputdict[inputkey])
+                sparkhelper.add_conf(key,inputdict[inputkey])
             else:
-                sparkhelper.addConf(key,value)
+                sparkhelper.add_conf(key,value)
             
         self.logger.debug("Spark cassandra source initialized as" + self.flavour)
 
index 722942a..fdec526 100644 (file)
@@ -29,7 +29,7 @@ class DefaultSparkSource(Source):
         """
         @Method: Single Arg Constructor
         """
-        self.ClassType="Default"
+        self.class_type="Default"
         self.flavour=classflavour
         self.logger=None
         self.sparkloadkey=None
@@ -47,9 +47,9 @@ class DefaultSparkSource(Source):
             value = sparkconfig[key]
             if value.startswith('$Input$'):
                 inputkey = value[7:]
-                sparkhelper.addConf(key,inputdict[inputkey])
+                sparkhelper.add_conf(key,inputdict[inputkey])
             else:
-                sparkhelper.addConf(key,value)
+                sparkhelper.add_conf(key,value)
         self.logger.debug("Spark Default Source Initialized as"+self.flavour)
     def load(self,sparksession):
         """
index 4103d53..de04a2f 100644 (file)
@@ -32,7 +32,7 @@ class InfluxSource(Source):
         """
         @Method: Single Arg Constructor
         """
-        self.ClassType="Custom"
+        self.class_type="Custom"
         self.flavour=classflavour
         self.logger  = None
         self.classconfig  = None
index a5ec5db..f8af9ed 100644 (file)
@@ -18,7 +18,7 @@
 [InfluxSource]
 Name=Influxdb
 Description= Spark connector that Supports Reading from InfluxDB
-ClassType=Custom
+class_type=Custom
 url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort}
 token = $ENV{Influx_Token}
 org = $ENV{Influx_DBHeirarchyL1}
@@ -32,7 +32,7 @@ query=this query for influx db
 [CassandraSource]
 Name=Cassandradb
 Description= Spark connector that Supports Reading from CassandraDB
-ClassType=Custom
+class_type=Custom
 SparkLoadKey=org.apache.spark.sql.cassandra
 
 [CassandraSource.SparkConf]
index fa464d0..93cb199 100644 (file)
@@ -40,7 +40,7 @@ class TMLogger(object):# pylint: disable=too-few-public-methods
             with open(conf_file, 'r') as file:
                 log_config = yaml.safe_load(file.read())
             logging.config.dictConfig(log_config)
-            self.LogLevel = log_config["root"]["level"]
+            self.log_level = log_config["root"]["level"]
             self.logger = logging.getLogger(__name__)
         except FileNotFoundError as err:
             print("error opening yaml config file")
@@ -58,6 +58,6 @@ class TMLogger(object):# pylint: disable=too-few-public-methods
         return self.logger
     
     @property
-    def get_logLevel(self):
-        return self.LogLevel
+    def get_log_level(self):
+        return self.log_level
     
index 2243dfc..ac1c4c3 100644 (file)
@@ -29,7 +29,7 @@ class Transform():
         """
         @Method: No Args Constructor
         """
-        self.ClassType="Custom"
+        self.class_type="Custom"
     @abstractmethod
     def init(self, sparkhelper, confighelper, inputdict):
         """
index 0d0c927..408d5f2 100644 (file)
@@ -28,7 +28,7 @@ class DefaultSparkTransform(Transform):
         """
         @Method:constructor
         """
-        self.ClassType="Default"
+        self.class_type="Default"
         self.flavour=classflavour
     def init(self,sparkhelper, confighelper,inputdict):
         """
index e54c49b..abbd770 100644 (file)
@@ -18,7 +18,7 @@
 [SQLTransform]
 Name=SQLTransformer
 Description= SQLTransformer Transforms your data using SQL statements
-ClassType=Custom
+class_type=Custom
 
 [ModuleDetails]
 ModuleName=transform
index a00ee13..5e8f7c4 100644 (file)
@@ -24,7 +24,7 @@ DefaultClassName=DefaultSparkSink
 [CassandraSink]
 Name=CassandraDB
 Description= Spark connector that Supports Writing from CassandraDB
-ClassType=Custom
+class_type=Custom
 SparkLoadKey=org.apache.spark.sql.cassandra
 WriteMode=overwrite
 
index 15ff6b5..e5b83c8 100644 (file)
@@ -19,7 +19,7 @@
 [InfluxSource]
 Name=Influxdb
 Description= Spark connector that Supports Reading from InfluxDB
-ClassType=Custom
+class_type=Custom
 url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort}
 token = $ENV{Influx_Token}
 org = $ENV{Influx_DBHeirarchyL1}
@@ -33,7 +33,7 @@ query=this query for influx db
 [CassandraSource]
 Name=Cassandradb
 Description= Spark connector that Supports Reading from CassandraDB
-ClassType=Custom
+class_type=Custom
 SparkLoadKey=org.apache.spark.sql.cassandra
 
 [CassandraSource.SparkConf]
index e1a1b8e..483caad 100644 (file)
@@ -35,7 +35,7 @@ class Spark_helper:
         self.test_result = False
         self.read = helper2()
     
-    def addConf(self, key, value):
+    def add_conf(self, key, value):
         self.test_result = True
     
     def load(self):
index 9349d47..17e6cad 100644 (file)
@@ -32,7 +32,7 @@ class sparkSessionHelper():
     def __init__(self):
         pass
 
-    def addConf(self, key, value):
+    def add_conf(self, key, value):
         pass
 
 class df_helper():
index a2499f5..d003093 100644 (file)
@@ -29,9 +29,9 @@ sys.path.extend(["dataextraction/"]) # In order to import dataextraction functio
 
 
 import main
-from SparkHelper import sparkSessionManager
+from SparkHelper import SparkSessionManager
 from Pipeline import Pipeline
-from main import task
+from main import Task
 
 
 
@@ -66,7 +66,7 @@ class Test_task_status:
      
     def test_task_status(self):  
         request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
-        main.task_map["unittest_task"] = task(request_json ,"Accepted")
+        main.task_map["unittest_task"] = Task(request_json ,"Accepted")
         response = self.client.get("/task-status/unittest_task")
         main.task_map.clear()
         assert response.content_type ==  'application/json'
@@ -75,7 +75,7 @@ class Test_task_status:
     #Error test
     def test_negative_task_status(self):  
         request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
-        main.task_map["unittest_task"] = task(request_json ,"Error")
+        main.task_map["unittest_task"] = Task(request_json ,"Error")
         response = self.client.get("/task-status/unittest_task")
         main.task_map.clear()
        
@@ -96,7 +96,7 @@ class Test_delete_task_status:
 
     def test_delete_task_status(self):
         request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
-        main.task_map["unittest_task"] = task(request_json ,"Accepted")
+        main.task_map["unittest_task"] = Task(request_json ,"Accepted")
         response = self.client.delete("/delete-task-status/unittest_task")
         main.task_map.clear()
         assert response.content_type ==  'application/json'
@@ -112,8 +112,8 @@ class Test_async_code_worker:
     def setup_method(self):
         self.client = main.app.test_client(self)
     
-    @patch('main.session_helper.getSession')
-    @patch('main.factory.getBatchPipeline')
+    @patch('main.session_helper.get_session')
+    @patch('main.factory.get_batch_pipeline')
     def test_negative_async_code_worker_1(self,mock1,mock2):
         
         main.infinte_loop_config["infinte_run"]= "True"
index 62f0d10..e48e5b7 100644 (file)
@@ -25,7 +25,7 @@ sys.path.extend(["dataextraction/"])
 
 from Pipeline import Pipeline
 from FeatureEngineeringFactory import FeatureEngineeringFactory
-from SparkHelper import sparkSessionManager
+from SparkHelper import SparkSessionManager
 
 
 
@@ -48,10 +48,10 @@ class Test_Pipeline:
         api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}}
         load_dotenv('test/test_env.env')
         os.environ['CODE_DIR_PATH'] = 'test'
-        session_helper = sparkSessionManager()
+        session_helper = SparkSessionManager()
         factory = FeatureEngineeringFactory(session_helper)
         (source_dict, transform_dict, sink_dict) = (api_json['source'], api_json['transform'], api_json['sink'])
-        self.obj = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict))
+        self.obj = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict))
         self.spark_session = session_helper
 
     
@@ -60,27 +60,27 @@ class Test_Pipeline:
 
         
 
-    def test_loadData(self):
+    def test_load_data(self):
         assert self.obj != None, 'Pipeline Object Creation, Failed'
         self.obj.sources[0] = helper()
-        self.obj.loadData(self.spark_session )
+        self.obj.load_data(self.spark_session )
         assert self.obj.spark_dflist == 'Data Load Completed', 'Data Load Failed'
 
-    def test_transformData(self):
+    def test_transform_data(self):
         self.obj.transformers[0] = helper()
-        self.obj.transformData(self.spark_session)
+        self.obj.transform_data(self.spark_session)
 
         assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed'
 
-    def test_transformDataWithNoTransform(self):
+    def test_transform_data_with_no_transform(self):
         self.obj.transformers = None
         self.obj.spark_dflist = 'Data Transform Completed'
-        self.obj.transformData(self.spark_session)
+        self.obj.transform_data(self.spark_session)
         assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed When No Transformer is specified'
 
-    def test_writeData(self):
+    def test_write_data(self):
         self.obj.sinks[0] = helper()
-        self.obj.writeData(self.spark_session)
+        self.obj.write_data(self.spark_session)
         assert True
 
 
index 99501e1..07a1d6f 100644 (file)
@@ -36,7 +36,7 @@ def test_get_logLevel():
     load_dotenv('test/test_env.env') 
     os.environ['CODE_DIR_PATH']='test'
     tm_logger = TMLogger("test/log_config.yaml")
-    assert None != tm_logger.get_logLevel
+    assert None != tm_logger.get_log_level
 
 def test_init_withWrongFile():
     load_dotenv('test/test_env.env') 
index c63fa4b..bbcf30e 100644 (file)
@@ -19,7 +19,7 @@
 [SQLTransform]
 Name=SQLTransformer
 Description= SQLTransformer Transforms your data using SQL statements
-ClassType=Custom
+class_type=Custom
 
 [ModuleDetails]
 ModuleName=transform