From: ashishj1729 Date: Fri, 25 Nov 2022 10:31:16 +0000 (+0530) Subject: Adding Unit Test Cases X-Git-Tag: 1.0.0~7 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=f1bd56c9c0b60bc8a9bb6ea9c3b240658ff6ec77;p=aiml-fw%2Fathp%2Fdata-extraction.git Adding Unit Test Cases Issue-id: AIMLWF-6 Signed-off-by: ashishj1729 Change-Id: Ic7ca6adb1414529e529e5a04656a2e2698433bf5 Signed-off-by: ashishj1729 --- diff --git a/dataextraction/ConfigHelper.py b/dataextraction/ConfigHelper.py index a290b91..3f408ba 100644 --- a/dataextraction/ConfigHelper.py +++ b/dataextraction/ConfigHelper.py @@ -49,7 +49,11 @@ class ConfigHelper(metaclass=Singleton): try: self.exec_path = os.getenv('CODE_DIR_PATH') - self.tm_logger = TMLogger("../config/log_config.yaml") + + if self.exec_path is None: + self.tm_logger = TMLogger("../config/log_config.yaml") + else: + self.tm_logger = TMLogger(self.exec_path + "/log_config.yaml") self.logger = self.tm_logger.logger diff --git a/dataextraction/main.py b/dataextraction/main.py index 566f94b..20932cf 100644 --- a/dataextraction/main.py +++ b/dataextraction/main.py @@ -41,6 +41,7 @@ session_helper = sparkSessionManager() factory = FeatureEngineeringFactory(session_helper) tasks = queue.Queue() task_map = {} +infinte_loop_config={"infinte_run":"True", "unit_test_mode":"False"} class task(): """ @@ -156,7 +157,7 @@ def async_code_worker(): AsyncCode Worker Infinite loop which will retrive and process tasks assigned for executing data extraction """ - while True: + while infinte_loop_config["infinte_run"] == "True": try: start_time = datetime.datetime.now() logger.debug(str(start_time) +"Feature Engineering Pipeline Started") @@ -185,6 +186,9 @@ def async_code_worker(): api_result_msg = str(exc) task_map[task_id].status = "Error" task_map[task_id].error = api_result_msg + if infinte_loop_config["unit_test_mode"] == "True": + infinte_loop_config["infinte_run"] = "False" + logger.debug(infinte_loop_config) if __name__ == "__main__": print("******Initiaizing feature store API ******" ) threading.Thread(target=async_code_worker, daemon=True).start() diff --git a/dataextraction/source/InfluxSource.py b/dataextraction/source/InfluxSource.py index 87c8965..4103d53 100644 --- a/dataextraction/source/InfluxSource.py +++ b/dataextraction/source/InfluxSource.py @@ -28,21 +28,6 @@ class InfluxSource(Source): """ @Class : Influx DB Source """ - def __init__(self): - """ - @No Args Constructore - """ - super().__init__() - self.logger = None - self.classconfig = None - self.url = None - self.token = None - self.org = None - self.timeout = None - self.ssl = None - self.query = None - - def __init__(self,classflavour): """ @Method: Single Arg Constructor diff --git a/dataextraction/tmgr_logger.py b/dataextraction/tmgr_logger.py index 0c708c6..fa464d0 100644 --- a/dataextraction/tmgr_logger.py +++ b/dataextraction/tmgr_logger.py @@ -45,6 +45,7 @@ class TMLogger(object):# pylint: disable=too-few-public-methods except FileNotFoundError as err: print("error opening yaml config file") print(err) + raise Exception("error opening yaml config file") @property def get_logger(self): diff --git a/test/SparkConfig.ini b/test/SparkConfig.ini new file mode 100644 index 0000000..7ee005b --- /dev/null +++ b/test/SparkConfig.ini @@ -0,0 +1,6 @@ +[BaseConfig] +DefaultAppName=Pipeline +DefaultMaster=local[1] +Override_Log_Level=ERROR +[ExtraConfig] +spark.logConf=true diff --git a/test/log_config.yaml b/test/log_config.yaml new file mode 100644 index 0000000..35d7ff0 --- /dev/null +++ b/test/log_config.yaml @@ -0,0 +1,39 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +version: 1 +formatters: + simple: + format: '%(asctime)s | %(filename)s %(lineno)s %(funcName)s() | %(levelname)s | %(message)s' +handlers: + console: + class: logging.StreamHandler + level: DEBUG + formatter: simple + stream: ext://sys.stdout + access_file: + class: logging.handlers.RotatingFileHandler + level: DEBUG + formatter: simple + filename: fs.log + maxBytes: 10485760 + backupCount: 20 + encoding: utf8 +root: + level: DEBUG + handlers: [access_file,console] diff --git a/test/requirements_tests.txt b/test/requirements_tests.txt new file mode 100644 index 0000000..867c022 --- /dev/null +++ b/test/requirements_tests.txt @@ -0,0 +1,12 @@ +pytest-cov +python-dotenv +pyspark +cassandra-driver==3.25.0 +jsonpickle==2.0.0 +Flask==2.0.1 +Flask-API==3.0.post1 +Flask-RESTful==0.3.9 +lru-dict==1.1.7 +influxdb_client +mock +pandas \ No newline at end of file diff --git a/test/sink/SinkClassConfig.ini b/test/sink/SinkClassConfig.ini new file mode 100644 index 0000000..7a12e9a --- /dev/null +++ b/test/sink/SinkClassConfig.ini @@ -0,0 +1,36 @@ +[ModuleDetails] +ModuleName=sink +BaseClassName=Sink +DefaultClassName=DefaultSparkSink + +[CassandraSink] +Name=CassandraDB +Description= Spark connector that Supports Writing from CassandraDB +ClassType=Custom +SparkLoadKey=org.apache.spark.sql.cassandra +WriteMode=overwrite + +[CassandraSink.SparkConf] +spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel} +spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 + +[CassandraSource.Inputs] + +[EnvConfig] +FS_DB_USER=FS_DB_USER +FS_DB_PASSWORD=FS_DB_PASSWORD +FS_DB_IP=FS_DB_IP +FS_DB_PORT=FS_DB_PORT +FS_DB_NAME=FS_DB_KEYSPACE_NAME + + +Cassandra_SinkIP=FS_DB_IP +Cassandra_SinkPort=FS_DB_PORT +Cassandra_SinkUser=FS_DB_USER +Cassandra_SinkPassword=FS_DB_PASSWORD +Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL + +Cassandra_SinkDB=FS_DB_KEYSPACE_NAME + + + diff --git a/test/source/SourceClassConfig.ini b/test/source/SourceClassConfig.ini new file mode 100644 index 0000000..5e44f6a --- /dev/null +++ b/test/source/SourceClassConfig.ini @@ -0,0 +1,50 @@ +[InfluxSource] +Name=Influxdb +Description= Spark connector that Supports Reading from InfluxDB +ClassType=Custom +url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort} +token = $ENV{Influx_Token} +org = $ENV{Influx_DBHeirarchyL1} +timeout = 100000 +ssl = False +Query = $Input$query + +[InfluxSource.Inputs] +query=this query for influx db + +[CassandraSource] +Name=Cassandradb +Description= Spark connector that Supports Reading from CassandraDB +ClassType=Custom +SparkLoadKey=org.apache.spark.sql.cassandra + +[CassandraSource.SparkConf] +spark.cassandra.connection.host=$ENV{Cassandra_SourceIP} +spark.cassandra.connection.port=$ENV{Cassandra_SourcePort} +spark.cassandra.auth.username=$ENV{Cassandra_SourceUser} +spark.cassandra.auth.password=$ENV{Cassandra_SourcePassword} +spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel} +spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 + +[CassandraSource.Inputs] + +[ModuleDetails] +ModuleName=source +BaseClassName=Source +DefaultClassName=DefaultSparkSource + +[EnvConfig] +Influx_SourceIP=Influx_DATALAKE_IP +Influx_SourcePort=Influx_DATALAKE_PORT +Influx_MonSourcePort=Influx_DATALAKE_PORT +Influx_DBHeirarchyL1=Influx_ORG_NAME +Influx_DBHeirarchyL2=Influx_BUCKET_NAME +Influx_Token=Influx_Token + +Cassandra_SourceIP=Cassandra_DATALAKE_IP +Cassandra_SourcePort=Cassandra_DATALAKE_PORT +Cassandra_SourceUser=Cassandra_DATALAKE_USER +Cassandra_SourcePassword=Cassandra_DATALAKE_PASSWORD +Cassandra_SourceDB=Cassandra_KEYSPACE_NAME +Cassandra_SourceTable=Cassandra_TABLE_NAME +Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL \ No newline at end of file diff --git a/test/test_DefaultSparkSource.py b/test/test_DefaultSparkSource.py new file mode 100644 index 0000000..e1a1b8e --- /dev/null +++ b/test/test_DefaultSparkSource.py @@ -0,0 +1,68 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import pytest +import sys +import os +from dotenv import load_dotenv +sys.path.extend(["dataextraction/"]) +from source.DefaultSparkSource import DefaultSparkSource +from ConfigHelper import ConfigHelper + +class helper2: + def __init__(self): + pass + def format(self, input1): + return Spark_helper() + +class Spark_helper: + def __init__(self): + self.test_result = False + self.read = helper2() + + def addConf(self, key, value): + self.test_result = True + + def load(self): + return 'Spark Session Loaded' + + + +class Test_DefaultSparkSource: + def setup_method(self): + os.environ['CODE_DIR_PATH'] = 'test' + load_dotenv('test/test_env.env') + self.obj = DefaultSparkSource("CassandraSource") + + + + def test_init(self): + assert self.obj != None, 'DefaultSparkSource (Single args) Object Creation, Failed' + + def test_init3(self): + load_dotenv('test/test_env.env') + spark_helper_obj = Spark_helper() + self.obj.init(spark_helper_obj, ConfigHelper(), {}) + assert spark_helper_obj.test_result, 'Intialisation of Spark Source Failed' + + def test_load(self): + spark_helper_obj = Spark_helper() + self.obj.init(spark_helper_obj, ConfigHelper(), {}) + out = self.obj.load(spark_helper_obj) + + assert out == 'Spark Session Loaded' , 'Data Extraction Failed' \ No newline at end of file diff --git a/test/test_cassandrasink.py b/test/test_cassandrasink.py new file mode 100644 index 0000000..9349d47 --- /dev/null +++ b/test/test_cassandrasink.py @@ -0,0 +1,80 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import sys +import os +from dotenv import load_dotenv +from mock import patch + +# In order to import dataextraction functions +sys.path.extend(["dataextraction/"]) +os.environ['CODE_DIR_PATH']='test' +load_dotenv('test/test_env.env') # Loading Env Variables +from sink.CassandraSink import CassandraSink +from ConfigHelper import ConfigHelper + +class sparkSessionHelper(): + def __init__(self): + pass + + def addConf(self, key, value): + pass + +class df_helper(): + def __init__(self): + self.schema = self + self.names = ['Id', 'DLPRB'] + self.write = self + self.saved = False + + def select(self, query): + return self + + def withColumn(self, col_name, order): + return self + + def format(self, key): + return self + + def options(self, **kwargs): + return self + + def mode(self, mode): + return self + + def save(self): + self.saved = True + + +class Test_CassandraSink(): + def setup_method(self): + self.obj = CassandraSink("InfluxDb") + fsConf = ConfigHelper() + input_dict = {'CollectionName': 'last_check3'} + self.obj.init(sparkSessionHelper(), fsConf, input_dict) + + def test_init(self): + assert self.obj.logger != None, "Cassandra Sink Object Creation Failed" + + @patch('sink.CassandraSink.Cluster') + @patch('sink.CassandraSink.Cluster.connect') + @patch('sink.CassandraSink.monotonically_increasing_id') + def test_write(self, mock1, mock2, mock3): + df_helper_obj = df_helper() + self.obj.write(sparkSessionHelper(),df_helper_obj) + assert df_helper_obj.saved, "Data Failed to save" \ No newline at end of file diff --git a/test/test_confighelper.py b/test/test_confighelper.py new file mode 100644 index 0000000..0cbd154 --- /dev/null +++ b/test/test_confighelper.py @@ -0,0 +1,65 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import pytest +import sys +import os +from dotenv import load_dotenv + +sys.path.extend([["dataextraction/"]]) +from ConfigHelper import ConfigHelper +from tmgr_logger import TMLogger +from sink.CassandraSink import CassandraSink + + +def test_initWithCodePath(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + fsConf = ConfigHelper() + assert None != fsConf + +def test_getClassConfig(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + fsConf = ConfigHelper() + assert None != fsConf.getClassConfig(CassandraSink("sink")) + +def test_getEnvConfig(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + assert None != ConfigHelper().getEnvConfig() + +def test_getLogLevel(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + assert None != ConfigHelper().getLogLevel() + +def test_getLogger(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + assert None != ConfigHelper().getLogger() + +def test_getFsHost(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + assert None != ConfigHelper().getFsHost() + +def test_getFsPort(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + assert None != ConfigHelper().getFsPort() \ No newline at end of file diff --git a/test/test_defaultsparksink.py b/test/test_defaultsparksink.py new file mode 100644 index 0000000..7decd93 --- /dev/null +++ b/test/test_defaultsparksink.py @@ -0,0 +1,63 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import pytest +import sys +import os +from dotenv import load_dotenv + +load_dotenv('test/test_env.env') +os.environ['CODE_DIR_PATH']='test' +sys.path.extend([["dataextraction/"]]) + +from sink.DefaultSparkSink import DefaultSparkSink +from ConfigHelper import ConfigHelper +import main + +class helper: + ''' + In order to mimic/mock the Line sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save() for write + ''' + def __init__(self): + self.save_status = False + self.write = self + + def format(self, sparkloadkey): + return self + + def mode(self, write_type): + return self + + def save(self): + self.save_status = True + +class Test_DefaultSparkSink: + def setup_method(self): + self.sink_obj = DefaultSparkSink("CassandraSink") + fsConf = ConfigHelper() + input_dict = {'CollectionName': 'last_check3'} + self.sink_obj.init(main.session_helper,fsConf,input_dict) + + def test_init_sparksink(self): + assert None != self.sink_obj.logger + + + def test_write(self): + helper_obj = helper() + self.sink_obj.write(None , helper_obj) + assert helper_obj.save_status \ No newline at end of file diff --git a/test/test_defaultsparktransform.py b/test/test_defaultsparktransform.py new file mode 100644 index 0000000..abacc3f --- /dev/null +++ b/test/test_defaultsparktransform.py @@ -0,0 +1,38 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import pytest +import sys +import os +from dotenv import load_dotenv + +load_dotenv('test/test_env.env') +os.environ['CODE_DIR_PATH']='test' +sys.path.extend(["dataextraction/"]) +from transform.DefaultSparkTransform import DefaultSparkTransform +from ConfigHelper import ConfigHelper +import main + +def test_init_default_spark_transform(): + fsConf = ConfigHelper() + trans_obj = DefaultSparkTransform("transform") + input_dict = {'CollectionName': 'last_check3'} + trans_obj.init(main.session_helper,fsConf,input_dict) + trans_obj.transform(main.session_helper, None) # Remove this when Transform is implemented properly, and write another test_case for transform then!! + assert None != trans_obj + diff --git a/test/test_env.env b/test/test_env.env new file mode 100644 index 0000000..9492f6f --- /dev/null +++ b/test/test_env.env @@ -0,0 +1,22 @@ +#change the values accordingly and all these env variables via 'source' command +FS_API_HOST='' +FS_API_PORT=1111 +FS_DB_IP='' +FS_DB_PORT=1234 +FS_DB_USER='dummy_user' +FS_DB_PASSWORD='' +FS_DB_KEYSPACE_NAME='dummy_namespace' +#Influx Data Lake +Influx_DATALAKE_IP= +Influx_DATALAKE_PORT=1235 +Influx_ORG_NAME='dummy_org' +Influx_BUCKET_NAME='dummy_org' +Influx_Token= +#Cassandra DB Datalake +Cassandra_DATALAKE_IP= +Cassandra_DATALAKE_PORT=54321 +Cassandra_DATALAKE_USER='dummy_cas_user' +Cassandra_DATALAKE_PASSWORD='' +Cassandra_KEYSPACE_NAME='dummy_cass_namespace' +Cassandra_TABLE_NAME='dummy_table_name' +Cassandra_CONS_LEVEL='ONE' \ No newline at end of file diff --git a/test/test_influxsource.py b/test/test_influxsource.py new file mode 100644 index 0000000..d028e31 --- /dev/null +++ b/test/test_influxsource.py @@ -0,0 +1,60 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== +import sys +import os +from dotenv import load_dotenv +from mock import patch + +# In order to import dataextraction functions +sys.path.extend(["dataextraction/"]) +os.environ['CODE_DIR_PATH']='test' +load_dotenv('test/test_env.env') # Loading Env Variables +from source.InfluxSource import InfluxSource +from ConfigHelper import ConfigHelper + + + +class helper(): + def __init__(self): + pass + + def createDataFrame(self, data_df): + return "Data Loaded Successfully" + +class Test_InfluxSource(): + def setup_method(self): + # self.obj_zero_args = InfluxSource() + self.obj_one_arg = InfluxSource("InfluxDb") + fsConf = ConfigHelper() + input_dict = {'query': 'Select * from last_check3'} + self.obj_one_arg.init(None, fsConf, input_dict) + + def test_init(self): + assert self.obj_one_arg.logger != None, "Influx Source Object Creation Failed" + + def test_negative_init(self): + influx_source_obj = InfluxSource("InfluxDb") + fsConf = ConfigHelper() + input_dict = {'dummy-key': 'dummy-value'} + influx_source_obj.init(None, fsConf, input_dict) + assert influx_source_obj.logger != None, "Influx Source Object Creation Failed" + + @patch('source.InfluxSource.InfluxDBClient') + def test_load(self, mock1): + out = self.obj_one_arg.load(helper()) + assert out == "Data Loaded Successfully", "Data Failed to Load" diff --git a/test/test_main.py b/test/test_main.py new file mode 100644 index 0000000..a2499f5 --- /dev/null +++ b/test/test_main.py @@ -0,0 +1,145 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import sys +import os +from dotenv import load_dotenv +import json +from mock import patch +from flask_api import status + +load_dotenv('test/test_env.env') # Loading Env Variables +os.environ['CODE_DIR_PATH']='test' # Telling ConfigHelper to look into test folder for log_config.yaml +sys.path.extend(["dataextraction/"]) # In order to import dataextraction functions + + +import main +from SparkHelper import sparkSessionManager +from Pipeline import Pipeline +from main import task + + + +def empyty_tasks(): + while(main.tasks.qsize() > 0): + task_id = main.tasks.get() + main.tasks.task_done() + +class Test_feature_groups: + def setup_method(self): + self.client = main.app.test_client(self) + + + def test_feature_group(self): + request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} + response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json') + empyty_tasks() + assert response.content_type == 'application/json' + assert response.status_code == status.HTTP_200_OK + + #Error test + def test_negative_feature_groups(self): + request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}]} + response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json') + assert response.content_type == 'text/html; charset=utf-8' + + + +class Test_task_status: + def setup_method(self): + self.client = main.app.test_client(self) + + def test_task_status(self): + request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} + main.task_map["unittest_task"] = task(request_json ,"Accepted") + response = self.client.get("/task-status/unittest_task") + main.task_map.clear() + assert response.content_type == 'application/json' + assert response.status_code == status.HTTP_200_OK + + #Error test + def test_negative_task_status(self): + request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} + main.task_map["unittest_task"] = task(request_json ,"Error") + response = self.client.get("/task-status/unittest_task") + main.task_map.clear() + + assert str(response.status) == "500 INTERNAL SERVER ERROR", "Test_negative_task_status doesn't retured response code 500" + +class Test_task_statuses: + def setup_method(self): + self.client = main.app.test_client(self) + + def test_task_statuses(self): + response = self.client.get("/task-statuses") + assert None != response + + +class Test_delete_task_status: + def setup_method(self): + self.client = main.app.test_client(self) + + def test_delete_task_status(self): + request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} + main.task_map["unittest_task"] = task(request_json ,"Accepted") + response = self.client.delete("/delete-task-status/unittest_task") + main.task_map.clear() + assert response.content_type == 'application/json' + assert response.status_code == status.HTTP_200_OK + + #error scenario + def test_negative_delete_task_status(self): + response = self.client.delete("/delete-task-status/unittest_task") + assert response.content_type == 'application/json' + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + +class Test_async_code_worker: + def setup_method(self): + self.client = main.app.test_client(self) + + @patch('main.session_helper.getSession') + @patch('main.factory.getBatchPipeline') + def test_negative_async_code_worker_1(self,mock1,mock2): + + main.infinte_loop_config["infinte_run"]= "True" + main.infinte_loop_config["unit_test_mode"]= "True" + request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} + response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json') + main.async_code_worker() + assert main.infinte_loop_config["infinte_run"] == "False" + + + #error + def test_negative_async_code_worker_2(self): + + main.infinte_loop_config["infinte_run"]= "True" + main.infinte_loop_config["unit_test_mode"]= "True" + request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} + response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json') + main.async_code_worker() + assert main.infinte_loop_config["infinte_run"] == "False" + + #error with Cassandra Source + def test_negative_async_code_worker_3(self): + + main.infinte_loop_config["infinte_run"]= "True" + main.infinte_loop_config["unit_test_mode"]= "True" + request_json = {'source': {'CassandraSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}} + response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json') + main.async_code_worker() + assert main.infinte_loop_config["infinte_run"] == "False" \ No newline at end of file diff --git a/test/test_pipeline.py b/test/test_pipeline.py new file mode 100644 index 0000000..62f0d10 --- /dev/null +++ b/test/test_pipeline.py @@ -0,0 +1,92 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import sys +import os +from dotenv import load_dotenv + +# In order to import dataextraction functions +sys.path.extend(["dataextraction/"]) + +from Pipeline import Pipeline +from FeatureEngineeringFactory import FeatureEngineeringFactory +from SparkHelper import sparkSessionManager + + + +class helper: + '''Helper class to Mimic data Load, Transform and Sink''' + def __init__(self): + pass + + def load(self, sparksession): + return 'Data Load Completed' + + def transform(self, sparksession, df_list): + return 'Data Transform Completed' + + def write(self, sparksession, transform_df_list): + return 'Data Written to Sink' + +class Test_Pipeline: + def setup_method(self): + api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}} + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH'] = 'test' + session_helper = sparkSessionManager() + factory = FeatureEngineeringFactory(session_helper) + (source_dict, transform_dict, sink_dict) = (api_json['source'], api_json['transform'], api_json['sink']) + self.obj = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict)) + self.spark_session = session_helper + + + def test_init_pipeline(self): + assert self.obj != None, 'Pipeline Object Creation, Failed' + + + + def test_loadData(self): + assert self.obj != None, 'Pipeline Object Creation, Failed' + self.obj.sources[0] = helper() + self.obj.loadData(self.spark_session ) + assert self.obj.spark_dflist == 'Data Load Completed', 'Data Load Failed' + + def test_transformData(self): + self.obj.transformers[0] = helper() + self.obj.transformData(self.spark_session) + + assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed' + + def test_transformDataWithNoTransform(self): + self.obj.transformers = None + self.obj.spark_dflist = 'Data Transform Completed' + self.obj.transformData(self.spark_session) + assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed When No Transformer is specified' + + def test_writeData(self): + self.obj.sinks[0] = helper() + self.obj.writeData(self.spark_session) + assert True + + + def test_execute(self): + self.obj.sources[0] = helper() + self.obj.transformers[0] = helper() + self.obj.sinks[0] = helper() + self.obj.execute(self.spark_session) + assert True diff --git a/test/test_tmgr_logger.py b/test/test_tmgr_logger.py new file mode 100644 index 0000000..99501e1 --- /dev/null +++ b/test/test_tmgr_logger.py @@ -0,0 +1,47 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import pytest +import sys +import os +from dotenv import load_dotenv + +# In order to import dataextraction functions +sys.path.extend([["dataextraction/"]]) + +from tmgr_logger import TMLogger + +def test_get_logger(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + tm_logger = TMLogger("test/log_config.yaml") + assert None != tm_logger.get_logger + +def test_get_logLevel(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + tm_logger = TMLogger("test/log_config.yaml") + assert None != tm_logger.get_logLevel + +def test_init_withWrongFile(): + load_dotenv('test/test_env.env') + os.environ['CODE_DIR_PATH']='test' + with pytest.raises(Exception) as exc: + tm_logger = TMLogger("bad_log_config.yaml") + + assert "error opening yaml config file" in str(exc.value) \ No newline at end of file diff --git a/test/transform/TransformClassConfig.ini b/test/transform/TransformClassConfig.ini new file mode 100644 index 0000000..9f2f398 --- /dev/null +++ b/test/transform/TransformClassConfig.ini @@ -0,0 +1,11 @@ +[SQLTransform] +Name=SQLTransformer +Description= SQLTransformer Transforms your data using SQL statements +ClassType=Custom + +[ModuleDetails] +ModuleName=transform +BaseClassName=Transform +DefaultClassName=DefaultSparkTransform + +[EnvConfig]