try:
self.exec_path = os.getenv('CODE_DIR_PATH')
- self.tm_logger = TMLogger("../config/log_config.yaml")
+
+ if self.exec_path is None:
+ self.tm_logger = TMLogger("../config/log_config.yaml")
+ else:
+ self.tm_logger = TMLogger(self.exec_path + "/log_config.yaml")
self.logger = self.tm_logger.logger
factory = FeatureEngineeringFactory(session_helper)
tasks = queue.Queue()
task_map = {}
+infinte_loop_config={"infinte_run":"True", "unit_test_mode":"False"}
class task():
"""
AsyncCode Worker
Infinite loop which will retrive and process tasks assigned for executing data extraction
"""
- while True:
+ while infinte_loop_config["infinte_run"] == "True":
try:
start_time = datetime.datetime.now()
logger.debug(str(start_time) +"Feature Engineering Pipeline Started")
api_result_msg = str(exc)
task_map[task_id].status = "Error"
task_map[task_id].error = api_result_msg
+ if infinte_loop_config["unit_test_mode"] == "True":
+ infinte_loop_config["infinte_run"] = "False"
+ logger.debug(infinte_loop_config)
if __name__ == "__main__":
print("******Initiaizing feature store API ******" )
threading.Thread(target=async_code_worker, daemon=True).start()
"""
@Class : Influx DB Source
"""
- def __init__(self):
- """
- @No Args Constructore
- """
- super().__init__()
- self.logger = None
- self.classconfig = None
- self.url = None
- self.token = None
- self.org = None
- self.timeout = None
- self.ssl = None
- self.query = None
-
-
def __init__(self,classflavour):
"""
@Method: Single Arg Constructor
except FileNotFoundError as err:
print("error opening yaml config file")
print(err)
+ raise Exception("error opening yaml config file")
@property
def get_logger(self):
--- /dev/null
+[BaseConfig]
+DefaultAppName=Pipeline
+DefaultMaster=local[1]
+Override_Log_Level=ERROR
+[ExtraConfig]
+spark.logConf=true
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+version: 1
+formatters:
+ simple:
+ format: '%(asctime)s | %(filename)s %(lineno)s %(funcName)s() | %(levelname)s | %(message)s'
+handlers:
+ console:
+ class: logging.StreamHandler
+ level: DEBUG
+ formatter: simple
+ stream: ext://sys.stdout
+ access_file:
+ class: logging.handlers.RotatingFileHandler
+ level: DEBUG
+ formatter: simple
+ filename: fs.log
+ maxBytes: 10485760
+ backupCount: 20
+ encoding: utf8
+root:
+ level: DEBUG
+ handlers: [access_file,console]
--- /dev/null
+pytest-cov
+python-dotenv
+pyspark
+cassandra-driver==3.25.0
+jsonpickle==2.0.0
+Flask==2.0.1
+Flask-API==3.0.post1
+Flask-RESTful==0.3.9
+lru-dict==1.1.7
+influxdb_client
+mock
+pandas
\ No newline at end of file
--- /dev/null
+[ModuleDetails]
+ModuleName=sink
+BaseClassName=Sink
+DefaultClassName=DefaultSparkSink
+
+[CassandraSink]
+Name=CassandraDB
+Description= Spark connector that Supports Writing from CassandraDB
+ClassType=Custom
+SparkLoadKey=org.apache.spark.sql.cassandra
+WriteMode=overwrite
+
+[CassandraSink.SparkConf]
+spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel}
+spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1
+
+[CassandraSource.Inputs]
+
+[EnvConfig]
+FS_DB_USER=FS_DB_USER
+FS_DB_PASSWORD=FS_DB_PASSWORD
+FS_DB_IP=FS_DB_IP
+FS_DB_PORT=FS_DB_PORT
+FS_DB_NAME=FS_DB_KEYSPACE_NAME
+
+
+Cassandra_SinkIP=FS_DB_IP
+Cassandra_SinkPort=FS_DB_PORT
+Cassandra_SinkUser=FS_DB_USER
+Cassandra_SinkPassword=FS_DB_PASSWORD
+Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL
+
+Cassandra_SinkDB=FS_DB_KEYSPACE_NAME
+
+
+
--- /dev/null
+[InfluxSource]
+Name=Influxdb
+Description= Spark connector that Supports Reading from InfluxDB
+ClassType=Custom
+url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort}
+token = $ENV{Influx_Token}
+org = $ENV{Influx_DBHeirarchyL1}
+timeout = 100000
+ssl = False
+Query = $Input$query
+
+[InfluxSource.Inputs]
+query=this query for influx db
+
+[CassandraSource]
+Name=Cassandradb
+Description= Spark connector that Supports Reading from CassandraDB
+ClassType=Custom
+SparkLoadKey=org.apache.spark.sql.cassandra
+
+[CassandraSource.SparkConf]
+spark.cassandra.connection.host=$ENV{Cassandra_SourceIP}
+spark.cassandra.connection.port=$ENV{Cassandra_SourcePort}
+spark.cassandra.auth.username=$ENV{Cassandra_SourceUser}
+spark.cassandra.auth.password=$ENV{Cassandra_SourcePassword}
+spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel}
+spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1
+
+[CassandraSource.Inputs]
+
+[ModuleDetails]
+ModuleName=source
+BaseClassName=Source
+DefaultClassName=DefaultSparkSource
+
+[EnvConfig]
+Influx_SourceIP=Influx_DATALAKE_IP
+Influx_SourcePort=Influx_DATALAKE_PORT
+Influx_MonSourcePort=Influx_DATALAKE_PORT
+Influx_DBHeirarchyL1=Influx_ORG_NAME
+Influx_DBHeirarchyL2=Influx_BUCKET_NAME
+Influx_Token=Influx_Token
+
+Cassandra_SourceIP=Cassandra_DATALAKE_IP
+Cassandra_SourcePort=Cassandra_DATALAKE_PORT
+Cassandra_SourceUser=Cassandra_DATALAKE_USER
+Cassandra_SourcePassword=Cassandra_DATALAKE_PASSWORD
+Cassandra_SourceDB=Cassandra_KEYSPACE_NAME
+Cassandra_SourceTable=Cassandra_TABLE_NAME
+Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+sys.path.extend(["dataextraction/"])
+from source.DefaultSparkSource import DefaultSparkSource
+from ConfigHelper import ConfigHelper
+
+class helper2:
+ def __init__(self):
+ pass
+ def format(self, input1):
+ return Spark_helper()
+
+class Spark_helper:
+ def __init__(self):
+ self.test_result = False
+ self.read = helper2()
+
+ def addConf(self, key, value):
+ self.test_result = True
+
+ def load(self):
+ return 'Spark Session Loaded'
+
+
+
+class Test_DefaultSparkSource:
+ def setup_method(self):
+ os.environ['CODE_DIR_PATH'] = 'test'
+ load_dotenv('test/test_env.env')
+ self.obj = DefaultSparkSource("CassandraSource")
+
+
+
+ def test_init(self):
+ assert self.obj != None, 'DefaultSparkSource (Single args) Object Creation, Failed'
+
+ def test_init3(self):
+ load_dotenv('test/test_env.env')
+ spark_helper_obj = Spark_helper()
+ self.obj.init(spark_helper_obj, ConfigHelper(), {})
+ assert spark_helper_obj.test_result, 'Intialisation of Spark Source Failed'
+
+ def test_load(self):
+ spark_helper_obj = Spark_helper()
+ self.obj.init(spark_helper_obj, ConfigHelper(), {})
+ out = self.obj.load(spark_helper_obj)
+
+ assert out == 'Spark Session Loaded' , 'Data Extraction Failed'
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import sys
+import os
+from dotenv import load_dotenv
+from mock import patch
+
+# In order to import dataextraction functions
+sys.path.extend(["dataextraction/"])
+os.environ['CODE_DIR_PATH']='test'
+load_dotenv('test/test_env.env') # Loading Env Variables
+from sink.CassandraSink import CassandraSink
+from ConfigHelper import ConfigHelper
+
+class sparkSessionHelper():
+ def __init__(self):
+ pass
+
+ def addConf(self, key, value):
+ pass
+
+class df_helper():
+ def __init__(self):
+ self.schema = self
+ self.names = ['Id', 'DLPRB']
+ self.write = self
+ self.saved = False
+
+ def select(self, query):
+ return self
+
+ def withColumn(self, col_name, order):
+ return self
+
+ def format(self, key):
+ return self
+
+ def options(self, **kwargs):
+ return self
+
+ def mode(self, mode):
+ return self
+
+ def save(self):
+ self.saved = True
+
+
+class Test_CassandraSink():
+ def setup_method(self):
+ self.obj = CassandraSink("InfluxDb")
+ fsConf = ConfigHelper()
+ input_dict = {'CollectionName': 'last_check3'}
+ self.obj.init(sparkSessionHelper(), fsConf, input_dict)
+
+ def test_init(self):
+ assert self.obj.logger != None, "Cassandra Sink Object Creation Failed"
+
+ @patch('sink.CassandraSink.Cluster')
+ @patch('sink.CassandraSink.Cluster.connect')
+ @patch('sink.CassandraSink.monotonically_increasing_id')
+ def test_write(self, mock1, mock2, mock3):
+ df_helper_obj = df_helper()
+ self.obj.write(sparkSessionHelper(),df_helper_obj)
+ assert df_helper_obj.saved, "Data Failed to save"
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+
+sys.path.extend([["dataextraction/"]])
+from ConfigHelper import ConfigHelper
+from tmgr_logger import TMLogger
+from sink.CassandraSink import CassandraSink
+
+
+def test_initWithCodePath():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ fsConf = ConfigHelper()
+ assert None != fsConf
+
+def test_getClassConfig():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ fsConf = ConfigHelper()
+ assert None != fsConf.getClassConfig(CassandraSink("sink"))
+
+def test_getEnvConfig():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ assert None != ConfigHelper().getEnvConfig()
+
+def test_getLogLevel():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ assert None != ConfigHelper().getLogLevel()
+
+def test_getLogger():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ assert None != ConfigHelper().getLogger()
+
+def test_getFsHost():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ assert None != ConfigHelper().getFsHost()
+
+def test_getFsPort():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ assert None != ConfigHelper().getFsPort()
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+
+load_dotenv('test/test_env.env')
+os.environ['CODE_DIR_PATH']='test'
+sys.path.extend([["dataextraction/"]])
+
+from sink.DefaultSparkSink import DefaultSparkSink
+from ConfigHelper import ConfigHelper
+import main
+
+class helper:
+ '''
+ In order to mimic/mock the Line sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save() for write
+ '''
+ def __init__(self):
+ self.save_status = False
+ self.write = self
+
+ def format(self, sparkloadkey):
+ return self
+
+ def mode(self, write_type):
+ return self
+
+ def save(self):
+ self.save_status = True
+
+class Test_DefaultSparkSink:
+ def setup_method(self):
+ self.sink_obj = DefaultSparkSink("CassandraSink")
+ fsConf = ConfigHelper()
+ input_dict = {'CollectionName': 'last_check3'}
+ self.sink_obj.init(main.session_helper,fsConf,input_dict)
+
+ def test_init_sparksink(self):
+ assert None != self.sink_obj.logger
+
+
+ def test_write(self):
+ helper_obj = helper()
+ self.sink_obj.write(None , helper_obj)
+ assert helper_obj.save_status
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+
+load_dotenv('test/test_env.env')
+os.environ['CODE_DIR_PATH']='test'
+sys.path.extend(["dataextraction/"])
+from transform.DefaultSparkTransform import DefaultSparkTransform
+from ConfigHelper import ConfigHelper
+import main
+
+def test_init_default_spark_transform():
+ fsConf = ConfigHelper()
+ trans_obj = DefaultSparkTransform("transform")
+ input_dict = {'CollectionName': 'last_check3'}
+ trans_obj.init(main.session_helper,fsConf,input_dict)
+ trans_obj.transform(main.session_helper, None) # Remove this when Transform is implemented properly, and write another test_case for transform then!!
+ assert None != trans_obj
+
--- /dev/null
+#change the values accordingly and all these env variables via 'source' command
+FS_API_HOST=''
+FS_API_PORT=1111
+FS_DB_IP=''
+FS_DB_PORT=1234
+FS_DB_USER='dummy_user'
+FS_DB_PASSWORD=''
+FS_DB_KEYSPACE_NAME='dummy_namespace'
+#Influx Data Lake
+Influx_DATALAKE_IP=
+Influx_DATALAKE_PORT=1235
+Influx_ORG_NAME='dummy_org'
+Influx_BUCKET_NAME='dummy_org'
+Influx_Token=
+#Cassandra DB Datalake
+Cassandra_DATALAKE_IP=
+Cassandra_DATALAKE_PORT=54321
+Cassandra_DATALAKE_USER='dummy_cas_user'
+Cassandra_DATALAKE_PASSWORD=''
+Cassandra_KEYSPACE_NAME='dummy_cass_namespace'
+Cassandra_TABLE_NAME='dummy_table_name'
+Cassandra_CONS_LEVEL='ONE'
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+import sys
+import os
+from dotenv import load_dotenv
+from mock import patch
+
+# In order to import dataextraction functions
+sys.path.extend(["dataextraction/"])
+os.environ['CODE_DIR_PATH']='test'
+load_dotenv('test/test_env.env') # Loading Env Variables
+from source.InfluxSource import InfluxSource
+from ConfigHelper import ConfigHelper
+
+
+
+class helper():
+ def __init__(self):
+ pass
+
+ def createDataFrame(self, data_df):
+ return "Data Loaded Successfully"
+
+class Test_InfluxSource():
+ def setup_method(self):
+ # self.obj_zero_args = InfluxSource()
+ self.obj_one_arg = InfluxSource("InfluxDb")
+ fsConf = ConfigHelper()
+ input_dict = {'query': 'Select * from last_check3'}
+ self.obj_one_arg.init(None, fsConf, input_dict)
+
+ def test_init(self):
+ assert self.obj_one_arg.logger != None, "Influx Source Object Creation Failed"
+
+ def test_negative_init(self):
+ influx_source_obj = InfluxSource("InfluxDb")
+ fsConf = ConfigHelper()
+ input_dict = {'dummy-key': 'dummy-value'}
+ influx_source_obj.init(None, fsConf, input_dict)
+ assert influx_source_obj.logger != None, "Influx Source Object Creation Failed"
+
+ @patch('source.InfluxSource.InfluxDBClient')
+ def test_load(self, mock1):
+ out = self.obj_one_arg.load(helper())
+ assert out == "Data Loaded Successfully", "Data Failed to Load"
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import sys
+import os
+from dotenv import load_dotenv
+import json
+from mock import patch
+from flask_api import status
+
+load_dotenv('test/test_env.env') # Loading Env Variables
+os.environ['CODE_DIR_PATH']='test' # Telling ConfigHelper to look into test folder for log_config.yaml
+sys.path.extend(["dataextraction/"]) # In order to import dataextraction functions
+
+
+import main
+from SparkHelper import sparkSessionManager
+from Pipeline import Pipeline
+from main import task
+
+
+
+def empyty_tasks():
+ while(main.tasks.qsize() > 0):
+ task_id = main.tasks.get()
+ main.tasks.task_done()
+
+class Test_feature_groups:
+ def setup_method(self):
+ self.client = main.app.test_client(self)
+
+
+ def test_feature_group(self):
+ request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+ response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
+ empyty_tasks()
+ assert response.content_type == 'application/json'
+ assert response.status_code == status.HTTP_200_OK
+
+ #Error test
+ def test_negative_feature_groups(self):
+ request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}]}
+ response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
+ assert response.content_type == 'text/html; charset=utf-8'
+
+
+
+class Test_task_status:
+ def setup_method(self):
+ self.client = main.app.test_client(self)
+
+ def test_task_status(self):
+ request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+ main.task_map["unittest_task"] = task(request_json ,"Accepted")
+ response = self.client.get("/task-status/unittest_task")
+ main.task_map.clear()
+ assert response.content_type == 'application/json'
+ assert response.status_code == status.HTTP_200_OK
+
+ #Error test
+ def test_negative_task_status(self):
+ request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+ main.task_map["unittest_task"] = task(request_json ,"Error")
+ response = self.client.get("/task-status/unittest_task")
+ main.task_map.clear()
+
+ assert str(response.status) == "500 INTERNAL SERVER ERROR", "Test_negative_task_status doesn't retured response code 500"
+
+class Test_task_statuses:
+ def setup_method(self):
+ self.client = main.app.test_client(self)
+
+ def test_task_statuses(self):
+ response = self.client.get("/task-statuses")
+ assert None != response
+
+
+class Test_delete_task_status:
+ def setup_method(self):
+ self.client = main.app.test_client(self)
+
+ def test_delete_task_status(self):
+ request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+ main.task_map["unittest_task"] = task(request_json ,"Accepted")
+ response = self.client.delete("/delete-task-status/unittest_task")
+ main.task_map.clear()
+ assert response.content_type == 'application/json'
+ assert response.status_code == status.HTTP_200_OK
+
+ #error scenario
+ def test_negative_delete_task_status(self):
+ response = self.client.delete("/delete-task-status/unittest_task")
+ assert response.content_type == 'application/json'
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+
+class Test_async_code_worker:
+ def setup_method(self):
+ self.client = main.app.test_client(self)
+
+ @patch('main.session_helper.getSession')
+ @patch('main.factory.getBatchPipeline')
+ def test_negative_async_code_worker_1(self,mock1,mock2):
+
+ main.infinte_loop_config["infinte_run"]= "True"
+ main.infinte_loop_config["unit_test_mode"]= "True"
+ request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+ response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
+ main.async_code_worker()
+ assert main.infinte_loop_config["infinte_run"] == "False"
+
+
+ #error
+ def test_negative_async_code_worker_2(self):
+
+ main.infinte_loop_config["infinte_run"]= "True"
+ main.infinte_loop_config["unit_test_mode"]= "True"
+ request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+ response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
+ main.async_code_worker()
+ assert main.infinte_loop_config["infinte_run"] == "False"
+
+ #error with Cassandra Source
+ def test_negative_async_code_worker_3(self):
+
+ main.infinte_loop_config["infinte_run"]= "True"
+ main.infinte_loop_config["unit_test_mode"]= "True"
+ request_json = {'source': {'CassandraSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+ response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
+ main.async_code_worker()
+ assert main.infinte_loop_config["infinte_run"] == "False"
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import sys
+import os
+from dotenv import load_dotenv
+
+# In order to import dataextraction functions
+sys.path.extend(["dataextraction/"])
+
+from Pipeline import Pipeline
+from FeatureEngineeringFactory import FeatureEngineeringFactory
+from SparkHelper import sparkSessionManager
+
+
+
+class helper:
+ '''Helper class to Mimic data Load, Transform and Sink'''
+ def __init__(self):
+ pass
+
+ def load(self, sparksession):
+ return 'Data Load Completed'
+
+ def transform(self, sparksession, df_list):
+ return 'Data Transform Completed'
+
+ def write(self, sparksession, transform_df_list):
+ return 'Data Written to Sink'
+
+class Test_Pipeline:
+ def setup_method(self):
+ api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}}
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH'] = 'test'
+ session_helper = sparkSessionManager()
+ factory = FeatureEngineeringFactory(session_helper)
+ (source_dict, transform_dict, sink_dict) = (api_json['source'], api_json['transform'], api_json['sink'])
+ self.obj = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict))
+ self.spark_session = session_helper
+
+
+ def test_init_pipeline(self):
+ assert self.obj != None, 'Pipeline Object Creation, Failed'
+
+
+
+ def test_loadData(self):
+ assert self.obj != None, 'Pipeline Object Creation, Failed'
+ self.obj.sources[0] = helper()
+ self.obj.loadData(self.spark_session )
+ assert self.obj.spark_dflist == 'Data Load Completed', 'Data Load Failed'
+
+ def test_transformData(self):
+ self.obj.transformers[0] = helper()
+ self.obj.transformData(self.spark_session)
+
+ assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed'
+
+ def test_transformDataWithNoTransform(self):
+ self.obj.transformers = None
+ self.obj.spark_dflist = 'Data Transform Completed'
+ self.obj.transformData(self.spark_session)
+ assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed When No Transformer is specified'
+
+ def test_writeData(self):
+ self.obj.sinks[0] = helper()
+ self.obj.writeData(self.spark_session)
+ assert True
+
+
+ def test_execute(self):
+ self.obj.sources[0] = helper()
+ self.obj.transformers[0] = helper()
+ self.obj.sinks[0] = helper()
+ self.obj.execute(self.spark_session)
+ assert True
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+import os
+from dotenv import load_dotenv
+
+# In order to import dataextraction functions
+sys.path.extend([["dataextraction/"]])
+
+from tmgr_logger import TMLogger
+
+def test_get_logger():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ tm_logger = TMLogger("test/log_config.yaml")
+ assert None != tm_logger.get_logger
+
+def test_get_logLevel():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ tm_logger = TMLogger("test/log_config.yaml")
+ assert None != tm_logger.get_logLevel
+
+def test_init_withWrongFile():
+ load_dotenv('test/test_env.env')
+ os.environ['CODE_DIR_PATH']='test'
+ with pytest.raises(Exception) as exc:
+ tm_logger = TMLogger("bad_log_config.yaml")
+
+ assert "error opening yaml config file" in str(exc.value)
\ No newline at end of file
--- /dev/null
+[SQLTransform]
+Name=SQLTransformer
+Description= SQLTransformer Transforms your data using SQL statements
+ClassType=Custom
+
+[ModuleDetails]
+ModuleName=transform
+BaseClassName=Transform
+DefaultClassName=DefaultSparkTransform
+
+[EnvConfig]