Adding Unit Test Cases 26/9826/2
authorashishj1729 <jain.ashish@samsung.com>
Fri, 25 Nov 2022 10:31:16 +0000 (16:01 +0530)
committerashishj1729 <jain.ashish@samsung.com>
Tue, 29 Nov 2022 06:06:03 +0000 (11:36 +0530)
Issue-id: AIMLWF-6

Signed-off-by: ashishj1729 <jain.ashish@samsung.com>
Change-Id: Ic7ca6adb1414529e529e5a04656a2e2698433bf5
Signed-off-by: ashishj1729 <jain.ashish@samsung.com>
20 files changed:
dataextraction/ConfigHelper.py
dataextraction/main.py
dataextraction/source/InfluxSource.py
dataextraction/tmgr_logger.py
test/SparkConfig.ini [new file with mode: 0644]
test/log_config.yaml [new file with mode: 0644]
test/requirements_tests.txt [new file with mode: 0644]
test/sink/SinkClassConfig.ini [new file with mode: 0644]
test/source/SourceClassConfig.ini [new file with mode: 0644]
test/test_DefaultSparkSource.py [new file with mode: 0644]
test/test_cassandrasink.py [new file with mode: 0644]
test/test_confighelper.py [new file with mode: 0644]
test/test_defaultsparksink.py [new file with mode: 0644]
test/test_defaultsparktransform.py [new file with mode: 0644]
test/test_env.env [new file with mode: 0644]
test/test_influxsource.py [new file with mode: 0644]
test/test_main.py [new file with mode: 0644]
test/test_pipeline.py [new file with mode: 0644]
test/test_tmgr_logger.py [new file with mode: 0644]
test/transform/TransformClassConfig.ini [new file with mode: 0644]

index a290b91..3f408ba 100644 (file)
@@ -49,7 +49,11 @@ class ConfigHelper(metaclass=Singleton):
         try:
 
             self.exec_path = os.getenv('CODE_DIR_PATH')
-            self.tm_logger = TMLogger("../config/log_config.yaml")
+            
+            if self.exec_path is None:
+                self.tm_logger = TMLogger("../config/log_config.yaml")
+            else:
+                self.tm_logger = TMLogger(self.exec_path + "/log_config.yaml")
 
 
             self.logger = self.tm_logger.logger
index 566f94b..20932cf 100644 (file)
@@ -41,6 +41,7 @@ session_helper = sparkSessionManager()
 factory = FeatureEngineeringFactory(session_helper)
 tasks = queue.Queue()
 task_map = {}
+infinte_loop_config={"infinte_run":"True", "unit_test_mode":"False"}
 
 class task():
     """
@@ -156,7 +157,7 @@ def async_code_worker():
     AsyncCode Worker
     Infinite loop which will retrive and process tasks assigned for executing data extraction
     """
-    while True:
+    while infinte_loop_config["infinte_run"] == "True":
         try:
             start_time = datetime.datetime.now()
             logger.debug(str(start_time) +"Feature Engineering Pipeline Started")
@@ -185,6 +186,9 @@ def async_code_worker():
             api_result_msg = str(exc)
             task_map[task_id].status = "Error"
             task_map[task_id].error = api_result_msg
+        if infinte_loop_config["unit_test_mode"] == "True":
+            infinte_loop_config["infinte_run"] = "False"
+            logger.debug(infinte_loop_config)
 if __name__ == "__main__":
     print("******Initiaizing feature store API ******" )
     threading.Thread(target=async_code_worker, daemon=True).start()
index 87c8965..4103d53 100644 (file)
@@ -28,21 +28,6 @@ class InfluxSource(Source):
     """
     @Class : Influx DB Source
     """
-    def __init__(self):
-        """
-        @No Args Constructore
-        """
-        super().__init__()
-        self.logger  = None
-        self.classconfig  = None
-        self.url     = None
-        self.token   = None
-        self.org     = None
-        self.timeout = None
-        self.ssl     = None
-        self.query = None
-        
-        
     def __init__(self,classflavour):
         """
         @Method: Single Arg Constructor
index 0c708c6..fa464d0 100644 (file)
@@ -45,6 +45,7 @@ class TMLogger(object):# pylint: disable=too-few-public-methods
         except FileNotFoundError as err:
             print("error opening yaml config file")
             print(err)
+            raise Exception("error opening yaml config file")
 
     @property
     def get_logger(self):
diff --git a/test/SparkConfig.ini b/test/SparkConfig.ini
new file mode 100644 (file)
index 0000000..7ee005b
--- /dev/null
@@ -0,0 +1,6 @@
+[BaseConfig]
+DefaultAppName=Pipeline
+DefaultMaster=local[1]
+Override_Log_Level=ERROR
+[ExtraConfig]
+spark.logConf=true
diff --git a/test/log_config.yaml b/test/log_config.yaml
new file mode 100644 (file)
index 0000000..35d7ff0
--- /dev/null
@@ -0,0 +1,39 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+version: 1
+formatters:
+  simple:
+    format: '%(asctime)s | %(filename)s %(lineno)s %(funcName)s() |  %(levelname)s | %(message)s'
+handlers:
+  console:
+    class: logging.StreamHandler
+    level: DEBUG
+    formatter: simple
+    stream: ext://sys.stdout
+  access_file:
+    class: logging.handlers.RotatingFileHandler
+    level: DEBUG
+    formatter: simple
+    filename: fs.log
+    maxBytes: 10485760
+    backupCount: 20
+    encoding: utf8
+root:
+    level: DEBUG
+    handlers: [access_file,console]
diff --git a/test/requirements_tests.txt b/test/requirements_tests.txt
new file mode 100644 (file)
index 0000000..867c022
--- /dev/null
@@ -0,0 +1,12 @@
+pytest-cov
+python-dotenv
+pyspark
+cassandra-driver==3.25.0
+jsonpickle==2.0.0
+Flask==2.0.1
+Flask-API==3.0.post1
+Flask-RESTful==0.3.9
+lru-dict==1.1.7
+influxdb_client
+mock
+pandas
\ No newline at end of file
diff --git a/test/sink/SinkClassConfig.ini b/test/sink/SinkClassConfig.ini
new file mode 100644 (file)
index 0000000..7a12e9a
--- /dev/null
@@ -0,0 +1,36 @@
+[ModuleDetails]
+ModuleName=sink
+BaseClassName=Sink
+DefaultClassName=DefaultSparkSink
+
+[CassandraSink]
+Name=CassandraDB
+Description= Spark connector that Supports Writing from CassandraDB
+ClassType=Custom
+SparkLoadKey=org.apache.spark.sql.cassandra
+WriteMode=overwrite
+
+[CassandraSink.SparkConf]
+spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel}
+spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1
+
+[CassandraSource.Inputs]
+
+[EnvConfig]
+FS_DB_USER=FS_DB_USER
+FS_DB_PASSWORD=FS_DB_PASSWORD
+FS_DB_IP=FS_DB_IP
+FS_DB_PORT=FS_DB_PORT
+FS_DB_NAME=FS_DB_KEYSPACE_NAME
+
+
+Cassandra_SinkIP=FS_DB_IP
+Cassandra_SinkPort=FS_DB_PORT
+Cassandra_SinkUser=FS_DB_USER
+Cassandra_SinkPassword=FS_DB_PASSWORD
+Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL
+
+Cassandra_SinkDB=FS_DB_KEYSPACE_NAME
+
+
+
diff --git a/test/source/SourceClassConfig.ini b/test/source/SourceClassConfig.ini
new file mode 100644 (file)
index 0000000..5e44f6a
--- /dev/null
@@ -0,0 +1,50 @@
+[InfluxSource]
+Name=Influxdb
+Description= Spark connector that Supports Reading from InfluxDB
+ClassType=Custom
+url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort}
+token = $ENV{Influx_Token}
+org = $ENV{Influx_DBHeirarchyL1}
+timeout = 100000
+ssl = False
+Query = $Input$query
+
+[InfluxSource.Inputs]
+query=this query for influx db
+
+[CassandraSource]
+Name=Cassandradb
+Description= Spark connector that Supports Reading from CassandraDB
+ClassType=Custom
+SparkLoadKey=org.apache.spark.sql.cassandra
+
+[CassandraSource.SparkConf]
+spark.cassandra.connection.host=$ENV{Cassandra_SourceIP}
+spark.cassandra.connection.port=$ENV{Cassandra_SourcePort}
+spark.cassandra.auth.username=$ENV{Cassandra_SourceUser}
+spark.cassandra.auth.password=$ENV{Cassandra_SourcePassword}
+spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel}
+spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1
+
+[CassandraSource.Inputs]
+
+[ModuleDetails]
+ModuleName=source
+BaseClassName=Source
+DefaultClassName=DefaultSparkSource
+
+[EnvConfig]
+Influx_SourceIP=Influx_DATALAKE_IP
+Influx_SourcePort=Influx_DATALAKE_PORT
+Influx_MonSourcePort=Influx_DATALAKE_PORT
+Influx_DBHeirarchyL1=Influx_ORG_NAME
+Influx_DBHeirarchyL2=Influx_BUCKET_NAME
+Influx_Token=Influx_Token
+
+Cassandra_SourceIP=Cassandra_DATALAKE_IP
+Cassandra_SourcePort=Cassandra_DATALAKE_PORT
+Cassandra_SourceUser=Cassandra_DATALAKE_USER
+Cassandra_SourcePassword=Cassandra_DATALAKE_PASSWORD
+Cassandra_SourceDB=Cassandra_KEYSPACE_NAME
+Cassandra_SourceTable=Cassandra_TABLE_NAME
+Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL
\ No newline at end of file
diff --git a/test/test_DefaultSparkSource.py b/test/test_DefaultSparkSource.py
new file mode 100644 (file)
index 0000000..e1a1b8e
--- /dev/null
@@ -0,0 +1,68 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+sys.path.extend(["dataextraction/"])
+from source.DefaultSparkSource import DefaultSparkSource
+from ConfigHelper import ConfigHelper
+
+class helper2:
+    def __init__(self):
+        pass
+    def format(self, input1):
+        return Spark_helper()
+
+class Spark_helper:
+    def __init__(self):
+        self.test_result = False
+        self.read = helper2()
+    
+    def addConf(self, key, value):
+        self.test_result = True
+    
+    def load(self):
+        return 'Spark Session Loaded'
+    
+
+
+class Test_DefaultSparkSource:
+    def setup_method(self):
+        os.environ['CODE_DIR_PATH'] = 'test'
+        load_dotenv('test/test_env.env')
+        self.obj = DefaultSparkSource("CassandraSource")
+        
+
+
+    def test_init(self):
+        assert self.obj != None, 'DefaultSparkSource (Single args) Object Creation, Failed'
+        
+    def test_init3(self):
+        load_dotenv('test/test_env.env')
+        spark_helper_obj = Spark_helper()
+        self.obj.init(spark_helper_obj, ConfigHelper(), {})
+        assert spark_helper_obj.test_result, 'Intialisation of Spark Source Failed'
+
+    def test_load(self):
+        spark_helper_obj = Spark_helper()
+        self.obj.init(spark_helper_obj, ConfigHelper(), {})
+        out = self.obj.load(spark_helper_obj)
+
+        assert out == 'Spark Session Loaded' , 'Data Extraction Failed'
\ No newline at end of file
diff --git a/test/test_cassandrasink.py b/test/test_cassandrasink.py
new file mode 100644 (file)
index 0000000..9349d47
--- /dev/null
@@ -0,0 +1,80 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import sys
+import os
+from dotenv import load_dotenv
+from mock import patch
+
+# In order to import dataextraction functions
+sys.path.extend(["dataextraction/"])
+os.environ['CODE_DIR_PATH']='test'   
+load_dotenv('test/test_env.env') # Loading Env Variables 
+from sink.CassandraSink import CassandraSink
+from ConfigHelper import ConfigHelper
+
+class sparkSessionHelper():
+    def __init__(self):
+        pass
+
+    def addConf(self, key, value):
+        pass
+
+class df_helper():
+    def __init__(self):
+        self.schema = self
+        self.names = ['Id', 'DLPRB']
+        self.write = self
+        self.saved = False
+
+    def select(self, query):
+        return self
+    
+    def withColumn(self, col_name, order):
+        return self
+    
+    def format(self, key):
+        return self
+    
+    def options(self, **kwargs):
+        return self
+    
+    def mode(self, mode):
+        return self
+    
+    def save(self):
+        self.saved = True
+
+
+class Test_CassandraSink():
+    def setup_method(self):
+        self.obj = CassandraSink("InfluxDb")
+        fsConf = ConfigHelper()
+        input_dict = {'CollectionName': 'last_check3'}
+        self.obj.init(sparkSessionHelper(), fsConf, input_dict)
+
+    def test_init(self):
+        assert self.obj.logger != None, "Cassandra Sink Object Creation Failed"
+    
+    @patch('sink.CassandraSink.Cluster')
+    @patch('sink.CassandraSink.Cluster.connect')
+    @patch('sink.CassandraSink.monotonically_increasing_id')
+    def test_write(self, mock1, mock2, mock3):
+        df_helper_obj = df_helper()
+        self.obj.write(sparkSessionHelper(),df_helper_obj)
+        assert df_helper_obj.saved, "Data Failed to save"
\ No newline at end of file
diff --git a/test/test_confighelper.py b/test/test_confighelper.py
new file mode 100644 (file)
index 0000000..0cbd154
--- /dev/null
@@ -0,0 +1,65 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+
+sys.path.extend([["dataextraction/"]])
+from ConfigHelper import ConfigHelper
+from tmgr_logger import TMLogger
+from sink.CassandraSink import CassandraSink
+
+
+def test_initWithCodePath():
+    load_dotenv('test/test_env.env')
+    os.environ['CODE_DIR_PATH']='test'
+    fsConf = ConfigHelper()
+    assert None != fsConf
+
+def test_getClassConfig():
+    load_dotenv('test/test_env.env')
+    os.environ['CODE_DIR_PATH']='test'
+    fsConf = ConfigHelper()   
+    assert None != fsConf.getClassConfig(CassandraSink("sink"))
+
+def test_getEnvConfig():
+    load_dotenv('test/test_env.env')
+    os.environ['CODE_DIR_PATH']='test'
+    assert None != ConfigHelper().getEnvConfig()
+
+def test_getLogLevel():
+    load_dotenv('test/test_env.env')
+    os.environ['CODE_DIR_PATH']='test'
+    assert None != ConfigHelper().getLogLevel()
+
+def test_getLogger():
+    load_dotenv('test/test_env.env')
+    os.environ['CODE_DIR_PATH']='test'
+    assert None != ConfigHelper().getLogger()
+
+def test_getFsHost():
+    load_dotenv('test/test_env.env')
+    os.environ['CODE_DIR_PATH']='test'
+    assert None != ConfigHelper().getFsHost()
+
+def test_getFsPort():
+    load_dotenv('test/test_env.env')
+    os.environ['CODE_DIR_PATH']='test'
+    assert None != ConfigHelper().getFsPort()
\ No newline at end of file
diff --git a/test/test_defaultsparksink.py b/test/test_defaultsparksink.py
new file mode 100644 (file)
index 0000000..7decd93
--- /dev/null
@@ -0,0 +1,63 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+
+load_dotenv('test/test_env.env') 
+os.environ['CODE_DIR_PATH']='test'       
+sys.path.extend([["dataextraction/"]])
+
+from sink.DefaultSparkSink import DefaultSparkSink
+from ConfigHelper import ConfigHelper
+import main
+
+class helper:
+    '''
+        In order to mimic/mock the Line sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save() for write
+    '''
+    def __init__(self):
+        self.save_status = False
+        self.write = self
+    
+    def format(self, sparkloadkey):
+        return self
+    
+    def mode(self, write_type):
+        return self
+    
+    def save(self):
+        self.save_status = True
+
+class Test_DefaultSparkSink:
+    def setup_method(self):
+        self.sink_obj = DefaultSparkSink("CassandraSink")
+        fsConf = ConfigHelper()
+        input_dict = {'CollectionName': 'last_check3'}
+        self.sink_obj.init(main.session_helper,fsConf,input_dict)
+
+    def test_init_sparksink(self):
+        assert None != self.sink_obj.logger
+        
+
+    def test_write(self):
+        helper_obj = helper()
+        self.sink_obj.write(None , helper_obj)
+        assert helper_obj.save_status
\ No newline at end of file
diff --git a/test/test_defaultsparktransform.py b/test/test_defaultsparktransform.py
new file mode 100644 (file)
index 0000000..abacc3f
--- /dev/null
@@ -0,0 +1,38 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+
+load_dotenv('test/test_env.env') 
+os.environ['CODE_DIR_PATH']='test'       
+sys.path.extend(["dataextraction/"])
+from transform.DefaultSparkTransform import DefaultSparkTransform
+from ConfigHelper import ConfigHelper
+import main
+
+def test_init_default_spark_transform():
+    fsConf = ConfigHelper()
+    trans_obj = DefaultSparkTransform("transform")
+    input_dict = {'CollectionName': 'last_check3'}
+    trans_obj.init(main.session_helper,fsConf,input_dict)
+    trans_obj.transform(main.session_helper, None) # Remove this when Transform is implemented properly, and write another test_case for transform then!!
+    assert None != trans_obj
+
diff --git a/test/test_env.env b/test/test_env.env
new file mode 100644 (file)
index 0000000..9492f6f
--- /dev/null
@@ -0,0 +1,22 @@
+#change the values accordingly and all these env variables via 'source' command
+FS_API_HOST=''
+FS_API_PORT=1111
+FS_DB_IP=''
+FS_DB_PORT=1234
+FS_DB_USER='dummy_user'
+FS_DB_PASSWORD=''
+FS_DB_KEYSPACE_NAME='dummy_namespace'
+#Influx Data Lake
+Influx_DATALAKE_IP=
+Influx_DATALAKE_PORT=1235
+Influx_ORG_NAME='dummy_org'
+Influx_BUCKET_NAME='dummy_org' 
+Influx_Token=
+#Cassandra DB Datalake
+Cassandra_DATALAKE_IP=
+Cassandra_DATALAKE_PORT=54321
+Cassandra_DATALAKE_USER='dummy_cas_user'
+Cassandra_DATALAKE_PASSWORD=''
+Cassandra_KEYSPACE_NAME='dummy_cass_namespace'
+Cassandra_TABLE_NAME='dummy_table_name'
+Cassandra_CONS_LEVEL='ONE'
\ No newline at end of file
diff --git a/test/test_influxsource.py b/test/test_influxsource.py
new file mode 100644 (file)
index 0000000..d028e31
--- /dev/null
@@ -0,0 +1,60 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+import sys
+import os
+from dotenv import load_dotenv
+from mock import patch
+
+# In order to import dataextraction functions
+sys.path.extend(["dataextraction/"])
+os.environ['CODE_DIR_PATH']='test'   
+load_dotenv('test/test_env.env') # Loading Env Variables 
+from source.InfluxSource import InfluxSource
+from ConfigHelper import ConfigHelper
+
+
+
+class helper():
+    def __init__(self):
+        pass
+
+    def createDataFrame(self, data_df):
+        return "Data Loaded Successfully"
+
+class Test_InfluxSource():
+    def setup_method(self):
+        # self.obj_zero_args = InfluxSource()
+        self.obj_one_arg = InfluxSource("InfluxDb")
+        fsConf = ConfigHelper()
+        input_dict = {'query': 'Select * from last_check3'}
+        self.obj_one_arg.init(None, fsConf, input_dict)
+
+    def test_init(self):
+        assert self.obj_one_arg.logger != None, "Influx Source Object Creation Failed"
+    
+    def test_negative_init(self):
+        influx_source_obj = InfluxSource("InfluxDb")
+        fsConf = ConfigHelper()
+        input_dict = {'dummy-key': 'dummy-value'}
+        influx_source_obj.init(None, fsConf, input_dict)
+        assert influx_source_obj.logger != None, "Influx Source Object Creation Failed"
+
+    @patch('source.InfluxSource.InfluxDBClient')
+    def test_load(self, mock1):
+        out = self.obj_one_arg.load(helper())
+        assert out == "Data Loaded Successfully", "Data Failed to Load"
diff --git a/test/test_main.py b/test/test_main.py
new file mode 100644 (file)
index 0000000..a2499f5
--- /dev/null
@@ -0,0 +1,145 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import sys
+import os
+from dotenv import load_dotenv
+import json
+from mock import patch
+from flask_api import status
+
+load_dotenv('test/test_env.env') # Loading Env Variables 
+os.environ['CODE_DIR_PATH']='test' # Telling ConfigHelper to look into test folder for log_config.yaml     
+sys.path.extend(["dataextraction/"]) # In order to import dataextraction functions
+
+
+import main
+from SparkHelper import sparkSessionManager
+from Pipeline import Pipeline
+from main import task
+
+
+
+def empyty_tasks():
+    while(main.tasks.qsize() > 0):
+        task_id = main.tasks.get()
+        main.tasks.task_done()
+
+class Test_feature_groups:
+    def setup_method(self):
+        self.client = main.app.test_client(self)
+        
+
+    def test_feature_group(self):
+        request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+        response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
+        empyty_tasks()
+        assert response.content_type ==  'application/json'
+        assert response.status_code ==  status.HTTP_200_OK
+
+    #Error test
+    def test_negative_feature_groups(self):
+        request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}]}
+        response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json') 
+        assert response.content_type ==  'text/html; charset=utf-8'
+        
+
+
+class Test_task_status:
+    def setup_method(self):
+        self.client = main.app.test_client(self)
+     
+    def test_task_status(self):  
+        request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+        main.task_map["unittest_task"] = task(request_json ,"Accepted")
+        response = self.client.get("/task-status/unittest_task")
+        main.task_map.clear()
+        assert response.content_type ==  'application/json'
+        assert response.status_code ==  status.HTTP_200_OK
+
+    #Error test
+    def test_negative_task_status(self):  
+        request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+        main.task_map["unittest_task"] = task(request_json ,"Error")
+        response = self.client.get("/task-status/unittest_task")
+        main.task_map.clear()
+       
+        assert str(response.status) == "500 INTERNAL SERVER ERROR", "Test_negative_task_status doesn't retured response code 500"
+
+class Test_task_statuses:
+    def setup_method(self):
+        self.client = main.app.test_client(self)
+     
+    def test_task_statuses(self):  
+        response = self.client.get("/task-statuses")
+        assert None != response
+
+
+class Test_delete_task_status:
+    def setup_method(self):
+        self.client = main.app.test_client(self)
+
+    def test_delete_task_status(self):
+        request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+        main.task_map["unittest_task"] = task(request_json ,"Accepted")
+        response = self.client.delete("/delete-task-status/unittest_task")
+        main.task_map.clear()
+        assert response.content_type ==  'application/json'
+        assert response.status_code ==  status.HTTP_200_OK
+
+    #error scenario
+    def test_negative_delete_task_status(self):
+        response = self.client.delete("/delete-task-status/unittest_task")
+        assert response.content_type ==  'application/json'
+        assert response.status_code ==  status.HTTP_500_INTERNAL_SERVER_ERROR
+
+class Test_async_code_worker:
+    def setup_method(self):
+        self.client = main.app.test_client(self)
+    
+    @patch('main.session_helper.getSession')
+    @patch('main.factory.getBatchPipeline')
+    def test_negative_async_code_worker_1(self,mock1,mock2):
+        
+        main.infinte_loop_config["infinte_run"]= "True"
+        main.infinte_loop_config["unit_test_mode"]= "True"
+        request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+        response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
+        main.async_code_worker()
+        assert main.infinte_loop_config["infinte_run"] == "False"
+        
+    
+    #error
+    def test_negative_async_code_worker_2(self):
+        
+        main.infinte_loop_config["infinte_run"]= "True"
+        main.infinte_loop_config["unit_test_mode"]= "True"
+        request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+        response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
+        main.async_code_worker()
+        assert main.infinte_loop_config["infinte_run"] == "False"
+
+    #error with Cassandra Source
+    def test_negative_async_code_worker_3(self):
+
+        main.infinte_loop_config["infinte_run"]= "True"
+        main.infinte_loop_config["unit_test_mode"]= "True"
+        request_json = {'source': {'CassandraSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+        response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
+        main.async_code_worker()
+        assert main.infinte_loop_config["infinte_run"] == "False"
\ No newline at end of file
diff --git a/test/test_pipeline.py b/test/test_pipeline.py
new file mode 100644 (file)
index 0000000..62f0d10
--- /dev/null
@@ -0,0 +1,92 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import sys
+import os
+from dotenv import load_dotenv
+
+# In order to import dataextraction functions
+sys.path.extend(["dataextraction/"])
+
+from Pipeline import Pipeline
+from FeatureEngineeringFactory import FeatureEngineeringFactory
+from SparkHelper import sparkSessionManager
+
+
+
+class helper:
+    '''Helper class to Mimic data Load, Transform and Sink'''
+    def __init__(self):
+        pass
+
+    def load(self, sparksession):
+        return 'Data Load Completed'
+    
+    def transform(self, sparksession, df_list):
+        return 'Data Transform Completed'
+    
+    def write(self, sparksession, transform_df_list):
+        return 'Data Written to Sink'
+
+class Test_Pipeline:
+    def setup_method(self):
+        api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}}
+        load_dotenv('test/test_env.env')
+        os.environ['CODE_DIR_PATH'] = 'test'
+        session_helper = sparkSessionManager()
+        factory = FeatureEngineeringFactory(session_helper)
+        (source_dict, transform_dict, sink_dict) = (api_json['source'], api_json['transform'], api_json['sink'])
+        self.obj = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict))
+        self.spark_session = session_helper
+
+    
+    def test_init_pipeline(self):
+        assert self.obj != None, 'Pipeline Object Creation, Failed'
+
+        
+
+    def test_loadData(self):
+        assert self.obj != None, 'Pipeline Object Creation, Failed'
+        self.obj.sources[0] = helper()
+        self.obj.loadData(self.spark_session )
+        assert self.obj.spark_dflist == 'Data Load Completed', 'Data Load Failed'
+
+    def test_transformData(self):
+        self.obj.transformers[0] = helper()
+        self.obj.transformData(self.spark_session)
+
+        assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed'
+
+    def test_transformDataWithNoTransform(self):
+        self.obj.transformers = None
+        self.obj.spark_dflist = 'Data Transform Completed'
+        self.obj.transformData(self.spark_session)
+        assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed When No Transformer is specified'
+
+    def test_writeData(self):
+        self.obj.sinks[0] = helper()
+        self.obj.writeData(self.spark_session)
+        assert True
+
+
+    def test_execute(self):
+        self.obj.sources[0] = helper()
+        self.obj.transformers[0] = helper()
+        self.obj.sinks[0] = helper()
+        self.obj.execute(self.spark_session)
+        assert True
diff --git a/test/test_tmgr_logger.py b/test/test_tmgr_logger.py
new file mode 100644 (file)
index 0000000..99501e1
--- /dev/null
@@ -0,0 +1,47 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+
+# In order to import dataextraction functions
+sys.path.extend([["dataextraction/"]])
+
+from tmgr_logger import TMLogger
+
+def test_get_logger():
+    load_dotenv('test/test_env.env') 
+    os.environ['CODE_DIR_PATH']='test'
+    tm_logger = TMLogger("test/log_config.yaml")
+    assert None != tm_logger.get_logger
+
+def test_get_logLevel():
+    load_dotenv('test/test_env.env') 
+    os.environ['CODE_DIR_PATH']='test'
+    tm_logger = TMLogger("test/log_config.yaml")
+    assert None != tm_logger.get_logLevel
+
+def test_init_withWrongFile():
+    load_dotenv('test/test_env.env') 
+    os.environ['CODE_DIR_PATH']='test'
+    with pytest.raises(Exception) as exc:
+        tm_logger = TMLogger("bad_log_config.yaml")
+        
+    assert "error opening yaml config file" in str(exc.value)
\ No newline at end of file
diff --git a/test/transform/TransformClassConfig.ini b/test/transform/TransformClassConfig.ini
new file mode 100644 (file)
index 0000000..9f2f398
--- /dev/null
@@ -0,0 +1,11 @@
+[SQLTransform]
+Name=SQLTransformer
+Description= SQLTransformer Transforms your data using SQL statements
+ClassType=Custom
+
+[ModuleDetails]
+ModuleName=transform
+BaseClassName=Transform
+DefaultClassName=DefaultSparkTransform
+
+[EnvConfig]