import os
import re
from tmgr_logger import TMLogger
+from exceptions_util import DataExtractionException
class Singleton(type):
self.logger = self.tm_logger.logger
- self.log_level = self.tm_logger.LogLevel
+ self.log_level = self.tm_logger.log_level
self.logger.debug("+++++++++++++++Initializaing Config Helper +++++++++++++++++++++++")
self.logger.debug(str(self.exec_path))
self.fs_dict ={}
self.logger.debug("Getting Env Variables for: " + baseclassname)
value=os.getenv(self.envconfig_file[key])
if value is None:
- raise Exception("Error Environment Variable Not Set"+self.envconfig_file[key] )
+ raise DataExtractionException("Error Environment Variable Not Set"+self.envconfig_file[key] )
self.envconfig[key]=value
self.logger.debug("Read Environment Config var: "+self.envconfig_file[key])
except Exception as exc:
"""
try:
classflavour = "Error"
- if my_classinstance.ClassType == "Default":
+ if my_classinstance.class_type == "Default":
classflavour = my_classinstance.flavour
- elif my_classinstance.ClassType == "Custom":
+ elif my_classinstance.class_type == "Custom":
classflavour = my_classinstance.__class__.__name__
baseclass=my_classinstance.__class__.__base__.__name__
self.logger.debug("baseclass is Set to "+baseclass)
self.logger.debug("Source basic properties:" + str(source_props))
source_props_dict = self.__InjectEnvValues(source_props)
- if ((source_props_dict["ClassType"] == "Default" or
- source_props_dict["ClassType"] == "Custom" )
+ if ((source_props_dict["class_type"] == "Default" or
+ source_props_dict["class_type"] == "Custom" )
and baseclass != "Transform" and classflavour != 'InfluxSource'):
injectedconfigsparkconf= self.\
__InjectEnvValues(configsource[classflavour+".SparkConf"])
try:
config = self.getConfigSource(baseclassname)
test= config[my_classname]
- if test["ClassType"] == "Default":
+ if test["class_type"] == "Default":
return True
return False
except Exception as exp:
try:
for key in configdict:
string = configdict[key]
- matchlist=re.findall(r'\$ENV\{[a-zA-Z_0-9]*\}',string)
+ matchlist=re.findall(r'\$ENV\{\w*\}',string) # \w stands for [a-zA-Z_0-9]
self.logger.debug("Injecting Env Values for,key-value " + key + " " + string )
for replacestr in matchlist:
envkey= replacestr[5:-1]
envkey =envkey.lower()
replacevalue = self.envconfig[envkey]
if replacevalue is None or not isinstance(replacevalue,str):
- raise Exception("environment variable Not found"\
+ raise DataExtractionException("environment variable Not found"\
+self.envconfig_file[envkey])
string = string.replace(replacestr,replacevalue)
configdict[key] = string
self.logger.debug("Class to be imported " + import_class)
my_class = getattr(importlib.import_module(import_class), my_classorflavour)
my_classinstance = my_class(my_classorflavour)
- elif default:
+ else:
self.logger.debug("Module Name "+self.modulename)
self.logger.debug("default Class Name "+self.default_class)
my_class = getattr(importlib.import_module(self.modulename+"."+self.default_class), self.default_class)
return my_instancelist
- def getBatchPipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key):
+ def get_batch_pipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key):
"""
@Function: Makes to get Batch Pipeline
@ Input : source Classess, Sink Classess, Transform Classess
self.logger.debug("Pipeline Created")
self.spark_dflist = None
self.transformed_df = None
- def loadData(self, session):
+ def load_data(self, session):
"""
@Function: Loads data from source
"""
self.logger.info("Source:" + str(self.sources[0]))
self.spark_dflist = self.sources[0].load(session)
self.logger.info("Data Load Completed")
- def transformData(self, session):
+ def transform_data(self, session):
"""
@Function : Transform Data
"""
else:
self.transformed_df = self.transformers[0].transform(session, self.spark_dflist)
self.logger.info("Data Transform Completed")
- def writeData(self, session):
+ def write_data(self, session):
"""
@Function: Write Data
"""
"""
@Function : Execute Pipeline
"""
- self.loadData(session)
- self.transformData(session)
- self.writeData(session)
+ self.load_data(session)
+ self.transform_data(session)
+ self.write_data(session)
from pyspark.sql import SparkSession
from ConfigHelper import ConfigHelper
-class sparkSessionManager():
+class SparkSessionManager():
"""
@Module:Spark Session Manager
"""
self.loglevel = base["Override_Log_Level"]
self.sconf = SparkConf()
self.addtionalconfig = config["ExtraConfig"]
- def addConf(self, key, value):
+ def add_conf(self, key, value):
"""
@Function: Adding configuration as runtime
"""
self.sconf.set(key, value)
- def getAllConf(self):
+ def get_all_conf(self):
self.logger.debug("*********** ALL CONF *** " + str(self.sconf.getAll()))
- def getSession(self):
+ def get_session(self):
"""
@Function: get Spark Session
"""
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+class DataExtractionException(Exception):
+ """
+ A class used to represent an Data Extraction exception
+
+ Attributes
+ ----------
+ message : str
+ a formatted string to print out what is exception
+ code : int
+ http status code
+ """
+
+ def __init__(self, code, message="exception occured"):
+ """
+ Parameters
+ ----------
+ message : str
+ a formatted string to print out what is exception
+ code : int
+ http statuse code
+
+ """
+
+ self.code = code
+ self.message = message
+ super().__init__(self.message)
+
+
from flask import Flask
from flask_restful import request
from flask_api import status
-from SparkHelper import sparkSessionManager
+from SparkHelper import SparkSessionManager
from ConfigHelper import ConfigHelper
from FeatureEngineeringFactory import FeatureEngineeringFactory
fsConf = ConfigHelper()
logger = fsConf.getLogger()
-session_helper = sparkSessionManager()
+session_helper = SparkSessionManager()
factory = FeatureEngineeringFactory(session_helper)
tasks = queue.Queue()
task_map = {}
infinte_loop_config={"infinte_run":"True", "unit_test_mode":"False"}
+default_mime_type = 'application/json'
-class task():
+class Task():
"""
Task Class
"""
api_result_msg = "/task-status/"+task_id
logger.debug("Generated ID"+task_id)
tasks.put(task_id)
- task_map[task_id] = task(request_json ,"Accepted")
+ task_map[task_id] = Task(request_json ,"Accepted")
logger.debug("Generated ID"+task_id)
except Exception as exc:
api_result_msg = str(exc)
dumps(\
{ "trainingjob_name":request_json["sink"]["CassandraSink"]["CollectionName"],\
"result" : api_result_msg }),\
- status= response_code,mimetype='application/json')
+ status= response_code,mimetype=default_mime_type)
end_time = datetime.datetime.now()
logger.info(str(end_time-start_time)+' API call finished')
return response
response = app.response_class(response=json.dumps(
{ "task_status":taskstatus,"result" : api_result_msg }),
- status= response_code,mimetype='application/json')
+ status= response_code,mimetype=default_mime_type)
return response
@app.route('/task-statuses', methods=['GET'])
def get_task_statuses():
response = str(exc)
response = app.response_class(response,
- status= response_code,mimetype='application/json')
+ status= response_code,mimetype=default_mime_type)
return response
@app.route('/delete-task-status/<task_id>', methods=['DELETE'])
def delete_task_status(task_id):
except Exception as exc:
response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
api_result_msg = str(exc)
- response = app.response_class(response=json.dumps({ "trainingjob_name": task_id,"result" : api_result_msg }), status= response_code,mimetype='application/json')
+ response = app.response_class(response=json.dumps({ "trainingjob_name": task_id,"result" : api_result_msg }), status= response_code,mimetype = default_mime_type)
return response
def async_code_worker():
while infinte_loop_config["infinte_run"] == "True":
try:
start_time = datetime.datetime.now()
- logger.debug(str(start_time) +"Feature Engineering Pipeline Started")
+ logger.debug(str(start_time) +"Feature Engineering Pipeline Started |-> tESTING IN Progress V2")
task_id = tasks.get()
request_json = task_map[task_id].task
task_map[task_id].status = "In Progress"
sink_dict = request_json["sink"]
c_key = str(source_dict)+str(transform_dict)+str(sink_dict)
logger.debug(c_key)
- feature_engineering_pipeline = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, c_key)
- session = session_helper.getSession()
- feature_engineering_pipeline.loadData(session)
- feature_engineering_pipeline.transformData(session)
- feature_engineering_pipeline.writeData(session)
+ feature_engineering_pipeline = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, c_key)
+ session = session_helper.get_session()
+ feature_engineering_pipeline.load_data(session)
+ feature_engineering_pipeline.transform_data(session)
+ feature_engineering_pipeline.write_data(session)
session_helper.stop()
task_map[task_id].status = "Completed"
tasks.task_done()
end_time = datetime.datetime.now()
- logger.debug(str(end_time) +"Feature Engineering Pipline Ended")
+ logger.debug(str(end_time) +"Feature Engineering Pipline Ended |-> tESTING IN Progress V2")
except Exception as exc:
session_helper.stop()
traceback.print_exc()
"""
@Method": No Args Constructor
"""
- self.ClassType="Custom"
+ self.class_type="Custom"
@abstractmethod
def init(self, sparkhelper, confighelper, inputdict):
"""
@Methond: Constructor
@Input : classflavor
"""
- self.ClassType = "Custom"
+ self.class_type = "Custom"
self.flavour = classflavour
self.logger = None
- classconfig = None
- sparkconfig = None
self.sparkloadkey = None
self.writetype = None
self.tableName = None
self.tableName = inputdict["CollectionName"]
classconfig, sparkconfig = confighelper.getClassConfig(self)
- envConf = confighelper.getEnvConfig()
+ env_conf = confighelper.getEnvConfig()
- self.keyspace = envConf["cassandra_sinkdb"]
- self.host = envConf["fs_db_ip"]
- self.port = envConf["fs_db_port"]
- self.user = envConf["fs_db_user"]
- self.passw = envConf["fs_db_password"]
+ self.keyspace = env_conf["cassandra_sinkdb"]
+ self.host = env_conf["fs_db_ip"]
+ self.port = env_conf["fs_db_port"]
+ self.user = env_conf["fs_db_user"]
+ self.passw = env_conf["fs_db_password"]
self.sparkloadkey = classconfig["SparkLoadKey"]
self.writetype = classconfig["WriteMode"]
print("Spark Config", key, sparkconfig[key])
if value.startswith("$Input$"):
inputkey = value[7:]
- sparkhelper.addConf(key, inputdict[inputkey])
+ sparkhelper.add_conf(key, inputdict[inputkey])
else:
- sparkhelper.addConf(key, value)
+ sparkhelper.add_conf(key, value)
def write(self, sparksession, sparkdf):
"""
session = cluster.connect(self.keyspace)
- session.execute(self.buildDeleteTable(sparkdf))
+ session.execute(self.buildDeleteTable())
query = self.buildCreateTable(sparkdf)
session.execute(query)
self.logger.debug("Create table query " + query)
return query
- def buildDeleteTable(self, sparkdf):
+ def buildDeleteTable(self):
"""
Builds simple cassandra query for deleting table
"""
def buildKeyspaceQuery(self, keyspace):
"""
- Builds cassandra query for creating keyspace
+ Builds cassandra query for creating keyspace 1
"""
query = (
"CREATE KEYSPACE IF NOT EXISTS "
@Methond: Constructor
@Input : classflavor
"""
- self.ClassType="Default"
+ self.class_type="Default"
self.flavour = classflavour
self.logger = None
- classconfig = None
- sparkconfig = None
self.sparkloadkey = None
self.writetype = None
def init(self, sparkhelper, confighelper, inputdict):
print("Spark Config",key,sparkconfig[key])
if value.startswith('$Input$'):
inputkey = value[7:]
- sparkhelper.addConf(key,inputdict[inputkey])
+ sparkhelper.add_conf(key,inputdict[inputkey])
else:
- sparkhelper.addConf(key,value)
+ sparkhelper.add_conf(key,value)
def write(self, sparksession, sparkdf):
"""
@Method: write
[CassandraSink]
Name=CassandraDB
Description= Spark connector that Supports Writing from CassandraDB
-ClassType=Custom
+class_type=Custom
SparkLoadKey=org.apache.spark.sql.cassandra
WriteMode=overwrite
"""
@Constructor No Args
"""
- self.ClassType="Custom"
+ self.class_type="Custom"
@abstractmethod
def init(self,sparkhelper,confighelper,inputdict):
"""
"""
@Method: Single Arg Constructor
"""
- self.ClassType="Custom"
+ self.class_type="Custom"
self.flavour=classflavour
self.logger=None
self.sparkloadkey=None
self.logger = confighelper.getLogger()
self.logger.debug("Set Cassandra Class Spark Source Flavor: " + self.flavour)
classconfig,sparkconfig = confighelper.getClassConfig(self)
- envConf = confighelper.getEnvConfig()
-
-
+ env_conf = confighelper.getEnvConfig()
self.sparkloadkey = classconfig["SparkLoadKey"]
- self.keyspace = envConf['cassandra_sourcedb']
- self.table = envConf['cassandra_sourcetable']
+ self.keyspace = env_conf['cassandra_sourcedb']
+ self.table = env_conf['cassandra_sourcetable']
self.logger.debug("Source keyspace:" + str(self.keyspace) + " table-" + str(self.table))
value = sparkconfig[key]
if value.startswith('$Input$'):
inputkey = value[7:]
- sparkhelper.addConf(key,inputdict[inputkey])
+ sparkhelper.add_conf(key,inputdict[inputkey])
else:
- sparkhelper.addConf(key,value)
+ sparkhelper.add_conf(key,value)
self.logger.debug("Spark cassandra source initialized as" + self.flavour)
"""
@Method: Single Arg Constructor
"""
- self.ClassType="Default"
+ self.class_type="Default"
self.flavour=classflavour
self.logger=None
self.sparkloadkey=None
value = sparkconfig[key]
if value.startswith('$Input$'):
inputkey = value[7:]
- sparkhelper.addConf(key,inputdict[inputkey])
+ sparkhelper.add_conf(key,inputdict[inputkey])
else:
- sparkhelper.addConf(key,value)
+ sparkhelper.add_conf(key,value)
self.logger.debug("Spark Default Source Initialized as"+self.flavour)
def load(self,sparksession):
"""
"""
@Method: Single Arg Constructor
"""
- self.ClassType="Custom"
+ self.class_type="Custom"
self.flavour=classflavour
self.logger = None
self.classconfig = None
[InfluxSource]
Name=Influxdb
Description= Spark connector that Supports Reading from InfluxDB
-ClassType=Custom
+class_type=Custom
url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort}
token = $ENV{Influx_Token}
org = $ENV{Influx_DBHeirarchyL1}
[CassandraSource]
Name=Cassandradb
Description= Spark connector that Supports Reading from CassandraDB
-ClassType=Custom
+class_type=Custom
SparkLoadKey=org.apache.spark.sql.cassandra
[CassandraSource.SparkConf]
with open(conf_file, 'r') as file:
log_config = yaml.safe_load(file.read())
logging.config.dictConfig(log_config)
- self.LogLevel = log_config["root"]["level"]
+ self.log_level = log_config["root"]["level"]
self.logger = logging.getLogger(__name__)
except FileNotFoundError as err:
print("error opening yaml config file")
return self.logger
@property
- def get_logLevel(self):
- return self.LogLevel
+ def get_log_level(self):
+ return self.log_level
"""
@Method: No Args Constructor
"""
- self.ClassType="Custom"
+ self.class_type="Custom"
@abstractmethod
def init(self, sparkhelper, confighelper, inputdict):
"""
"""
@Method:constructor
"""
- self.ClassType="Default"
+ self.class_type="Default"
self.flavour=classflavour
def init(self,sparkhelper, confighelper,inputdict):
"""
[SQLTransform]
Name=SQLTransformer
Description= SQLTransformer Transforms your data using SQL statements
-ClassType=Custom
+class_type=Custom
[ModuleDetails]
ModuleName=transform
[CassandraSink]
Name=CassandraDB
Description= Spark connector that Supports Writing from CassandraDB
-ClassType=Custom
+class_type=Custom
SparkLoadKey=org.apache.spark.sql.cassandra
WriteMode=overwrite
[InfluxSource]
Name=Influxdb
Description= Spark connector that Supports Reading from InfluxDB
-ClassType=Custom
+class_type=Custom
url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort}
token = $ENV{Influx_Token}
org = $ENV{Influx_DBHeirarchyL1}
[CassandraSource]
Name=Cassandradb
Description= Spark connector that Supports Reading from CassandraDB
-ClassType=Custom
+class_type=Custom
SparkLoadKey=org.apache.spark.sql.cassandra
[CassandraSource.SparkConf]
self.test_result = False
self.read = helper2()
- def addConf(self, key, value):
+ def add_conf(self, key, value):
self.test_result = True
def load(self):
def __init__(self):
pass
- def addConf(self, key, value):
+ def add_conf(self, key, value):
pass
class df_helper():
import main
-from SparkHelper import sparkSessionManager
+from SparkHelper import SparkSessionManager
from Pipeline import Pipeline
-from main import task
+from main import Task
def test_task_status(self):
request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
- main.task_map["unittest_task"] = task(request_json ,"Accepted")
+ main.task_map["unittest_task"] = Task(request_json ,"Accepted")
response = self.client.get("/task-status/unittest_task")
main.task_map.clear()
assert response.content_type == 'application/json'
#Error test
def test_negative_task_status(self):
request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
- main.task_map["unittest_task"] = task(request_json ,"Error")
+ main.task_map["unittest_task"] = Task(request_json ,"Error")
response = self.client.get("/task-status/unittest_task")
main.task_map.clear()
def test_delete_task_status(self):
request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
- main.task_map["unittest_task"] = task(request_json ,"Accepted")
+ main.task_map["unittest_task"] = Task(request_json ,"Accepted")
response = self.client.delete("/delete-task-status/unittest_task")
main.task_map.clear()
assert response.content_type == 'application/json'
def setup_method(self):
self.client = main.app.test_client(self)
- @patch('main.session_helper.getSession')
- @patch('main.factory.getBatchPipeline')
+ @patch('main.session_helper.get_session')
+ @patch('main.factory.get_batch_pipeline')
def test_negative_async_code_worker_1(self,mock1,mock2):
main.infinte_loop_config["infinte_run"]= "True"
from Pipeline import Pipeline
from FeatureEngineeringFactory import FeatureEngineeringFactory
-from SparkHelper import sparkSessionManager
+from SparkHelper import SparkSessionManager
api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}}
load_dotenv('test/test_env.env')
os.environ['CODE_DIR_PATH'] = 'test'
- session_helper = sparkSessionManager()
+ session_helper = SparkSessionManager()
factory = FeatureEngineeringFactory(session_helper)
(source_dict, transform_dict, sink_dict) = (api_json['source'], api_json['transform'], api_json['sink'])
- self.obj = factory.getBatchPipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict))
+ self.obj = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict))
self.spark_session = session_helper
- def test_loadData(self):
+ def test_load_data(self):
assert self.obj != None, 'Pipeline Object Creation, Failed'
self.obj.sources[0] = helper()
- self.obj.loadData(self.spark_session )
+ self.obj.load_data(self.spark_session )
assert self.obj.spark_dflist == 'Data Load Completed', 'Data Load Failed'
- def test_transformData(self):
+ def test_transform_data(self):
self.obj.transformers[0] = helper()
- self.obj.transformData(self.spark_session)
+ self.obj.transform_data(self.spark_session)
assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed'
- def test_transformDataWithNoTransform(self):
+ def test_transform_data_with_no_transform(self):
self.obj.transformers = None
self.obj.spark_dflist = 'Data Transform Completed'
- self.obj.transformData(self.spark_session)
+ self.obj.transform_data(self.spark_session)
assert self.obj.transformed_df == 'Data Transform Completed', 'Data Transform Failed When No Transformer is specified'
- def test_writeData(self):
+ def test_write_data(self):
self.obj.sinks[0] = helper()
- self.obj.writeData(self.spark_session)
+ self.obj.write_data(self.spark_session)
assert True
load_dotenv('test/test_env.env')
os.environ['CODE_DIR_PATH']='test'
tm_logger = TMLogger("test/log_config.yaml")
- assert None != tm_logger.get_logLevel
+ assert None != tm_logger.get_log_level
def test_init_withWrongFile():
load_dotenv('test/test_env.env')
[SQLTransform]
Name=SQLTransformer
Description= SQLTransformer Transforms your data using SQL statements
-ClassType=Custom
+class_type=Custom
[ModuleDetails]
ModuleName=transform