From 3ad8e8c9d0d42ee3edc333540c6314ae1945447b Mon Sep 17 00:00:00 2001 From: SANDEEP KUMAR JAISAWAL Date: Thu, 13 Oct 2022 12:40:51 +0530 Subject: [PATCH] Removing CRLF from few files Issue-Id: AIMLFW-2 Signed-off-by: SANDEEP KUMAR JAISAWAL Change-Id: I2ba4f43e29f80408d17ce188a6e68c26e3799f15 --- src/sink/CassandraSink.py | 370 +++++++++++++++++++++---------------------- src/sink/DefaultSparkSink.py | 130 +++++++-------- 2 files changed, 250 insertions(+), 250 deletions(-) diff --git a/src/sink/CassandraSink.py b/src/sink/CassandraSink.py index e095d67..736916e 100644 --- a/src/sink/CassandraSink.py +++ b/src/sink/CassandraSink.py @@ -1,185 +1,185 @@ -# ================================================================================== -# -# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ================================================================================== - -""" -@ModuleName: SInk Default Class -""" -from sink.Base import Sink -from pyspark import SparkContext, SparkConf -from pyspark.sql import SQLContext, SparkSession -from pyspark.sql.types import * -from cassandra.cluster import Cluster -from cassandra.auth import PlainTextAuthProvider -import time -import json -from pyspark.sql.functions import monotonically_increasing_id - - -class CassandraSink(Sink): - """ - Writes the queried data in to cassandra table. - It creates the table with given trainingjob name - It creates Keyspaces if doesn't exist. - @Class: Default Spark Sink - """ - - def __init__(self, classflavour): - """ - @Methond: Constructor - @Input : classflavor - """ - self.ClassType = "Custom" - self.flavour = classflavour - self.logger = None - classconfig = None - sparkconfig = None - self.sparkloadkey = None - self.writetype = None - self.tableName = None - self.keyspace = None - self.user = None - self.passw = None - self.host = None - self.port = None - - def init(self, sparkhelper, confighelper, inputdict): - """ - @Methond:init - @Inputs: sparkhelper - confighelper - inputdict - """ - self.logger = confighelper.getLogger() - - self.tableName = inputdict["CollectionName"] - classconfig, sparkconfig = confighelper.getClassConfig(self) - envConf = confighelper.getEnvConfig() - - self.keyspace = envConf["cassandra_sinkdb"] - self.host = envConf["fs_db_ip"] - self.port = envConf["fs_db_port"] - self.user = envConf["fs_db_user"] - self.passw = envConf["fs_db_password"] - - self.sparkloadkey = classconfig["SparkLoadKey"] - self.writetype = classconfig["WriteMode"] - - self.logger.debug( - "Sink keyspace-" + str(self.keyspace) + " table-" + str(self.tableName) - ) - - for key in sparkconfig: - value = sparkconfig[key] - print("Spark Config", key, sparkconfig[key]) - if value.startswith("$Input$"): - inputkey = value[7:] - sparkhelper.addConf(key, inputdict[inputkey]) - else: - sparkhelper.addConf(key, value) - - def write(self, sparksession, sparkdf): - """ - @Method: write the data from sparkdf in to cassandra table - @input : sparksession - sparkdf - """ - self.logger.debug("Data writing to Sink " + self.flavour) - self.create_table(sparkdf) - sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id()) - - - - write_options = { - "table": self.tableName, - "keyspace": self.keyspace, - "spark.cassandra.connection.host": self.host, - "spark.cassandra.connection.port": self.port, - "spark.cassandra.auth.username": self.user, - "spark.cassandra.auth.password": self.passw, - } - - sparkdf.write.format(self.sparkloadkey).options(**write_options).mode( - "append" - ).save() - - self.logger.debug( - "*** Data written to Sink *** " + self.keyspace + "." + self.tableName - ) - - - - def create_table(self, sparkdf): - """ - Recreates table in cassandra db as per trainingjob name - Creates Keyspaces , if doesn't exist - """ - self.logger.debug("Creating table...") - - auth_provider = PlainTextAuthProvider(username=self.user, password=self.passw) - cluster = Cluster([self.host], port=self.port, auth_provider=auth_provider) - - session_default = cluster.connect() - create_ks_query = self.buildKeyspaceQuery(self.keyspace) - session_default.execute(create_ks_query) - - session = cluster.connect(self.keyspace) - - - session.execute(self.buildDeleteTable(sparkdf)) - - query = self.buildCreateTable(sparkdf) - session.execute(query) - time.sleep(3) - cluster.shutdown() - - def buildCreateTable(self, sparkdf): - """ - Builds simple cassandra query for creating table. - Columns names as per sparkdf headers, choosing first header __Id in sparkdf - as primary key for table - """ - query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT," - if sparkdf is not None: - col_list = sparkdf.schema.names - # To maintain the column name case sensitivity - for col in col_list: - query = query + ' "' + str(col) + '"' + " text " + "," - - # Creating __Id column as Primary Key - query = query + "PRIMARY KEY(" + '"__Id"' + "));" - self.logger.debug("Create table query " + query) - return query - - def buildDeleteTable(self, sparkdf): - """ - Builds simple cassandra query for deleting table - """ - query = "DROP TABLE IF EXISTS " + self.tableName + " ;" - self.logger.debug("Delete table query " + query) - return query - - def buildKeyspaceQuery(self, keyspace): - """ - Builds cassandra query for creating keyspace - """ - query = ( - "CREATE KEYSPACE IF NOT EXISTS " - + keyspace - + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };" - ) - return query +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +@ModuleName: SInk Default Class +""" +from sink.Base import Sink +from pyspark import SparkContext, SparkConf +from pyspark.sql import SQLContext, SparkSession +from pyspark.sql.types import * +from cassandra.cluster import Cluster +from cassandra.auth import PlainTextAuthProvider +import time +import json +from pyspark.sql.functions import monotonically_increasing_id + + +class CassandraSink(Sink): + """ + Writes the queried data in to cassandra table. + It creates the table with given trainingjob name + It creates Keyspaces if doesn't exist. + @Class: Default Spark Sink + """ + + def __init__(self, classflavour): + """ + @Methond: Constructor + @Input : classflavor + """ + self.ClassType = "Custom" + self.flavour = classflavour + self.logger = None + classconfig = None + sparkconfig = None + self.sparkloadkey = None + self.writetype = None + self.tableName = None + self.keyspace = None + self.user = None + self.passw = None + self.host = None + self.port = None + + def init(self, sparkhelper, confighelper, inputdict): + """ + @Methond:init + @Inputs: sparkhelper + confighelper + inputdict + """ + self.logger = confighelper.getLogger() + + self.tableName = inputdict["CollectionName"] + classconfig, sparkconfig = confighelper.getClassConfig(self) + envConf = confighelper.getEnvConfig() + + self.keyspace = envConf["cassandra_sinkdb"] + self.host = envConf["fs_db_ip"] + self.port = envConf["fs_db_port"] + self.user = envConf["fs_db_user"] + self.passw = envConf["fs_db_password"] + + self.sparkloadkey = classconfig["SparkLoadKey"] + self.writetype = classconfig["WriteMode"] + + self.logger.debug( + "Sink keyspace-" + str(self.keyspace) + " table-" + str(self.tableName) + ) + + for key in sparkconfig: + value = sparkconfig[key] + print("Spark Config", key, sparkconfig[key]) + if value.startswith("$Input$"): + inputkey = value[7:] + sparkhelper.addConf(key, inputdict[inputkey]) + else: + sparkhelper.addConf(key, value) + + def write(self, sparksession, sparkdf): + """ + @Method: write the data from sparkdf in to cassandra table + @input : sparksession + sparkdf + """ + self.logger.debug("Data writing to Sink " + self.flavour) + self.create_table(sparkdf) + sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id()) + + + + write_options = { + "table": self.tableName, + "keyspace": self.keyspace, + "spark.cassandra.connection.host": self.host, + "spark.cassandra.connection.port": self.port, + "spark.cassandra.auth.username": self.user, + "spark.cassandra.auth.password": self.passw, + } + + sparkdf.write.format(self.sparkloadkey).options(**write_options).mode( + "append" + ).save() + + self.logger.debug( + "*** Data written to Sink *** " + self.keyspace + "." + self.tableName + ) + + + + def create_table(self, sparkdf): + """ + Recreates table in cassandra db as per trainingjob name + Creates Keyspaces , if doesn't exist + """ + self.logger.debug("Creating table...") + + auth_provider = PlainTextAuthProvider(username=self.user, password=self.passw) + cluster = Cluster([self.host], port=self.port, auth_provider=auth_provider) + + session_default = cluster.connect() + create_ks_query = self.buildKeyspaceQuery(self.keyspace) + session_default.execute(create_ks_query) + + session = cluster.connect(self.keyspace) + + + session.execute(self.buildDeleteTable(sparkdf)) + + query = self.buildCreateTable(sparkdf) + session.execute(query) + time.sleep(3) + cluster.shutdown() + + def buildCreateTable(self, sparkdf): + """ + Builds simple cassandra query for creating table. + Columns names as per sparkdf headers, choosing first header __Id in sparkdf + as primary key for table + """ + query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT," + if sparkdf is not None: + col_list = sparkdf.schema.names + # To maintain the column name case sensitivity + for col in col_list: + query = query + ' "' + str(col) + '"' + " text " + "," + + # Creating __Id column as Primary Key + query = query + "PRIMARY KEY(" + '"__Id"' + "));" + self.logger.debug("Create table query " + query) + return query + + def buildDeleteTable(self, sparkdf): + """ + Builds simple cassandra query for deleting table + """ + query = "DROP TABLE IF EXISTS " + self.tableName + " ;" + self.logger.debug("Delete table query " + query) + return query + + def buildKeyspaceQuery(self, keyspace): + """ + Builds cassandra query for creating keyspace + """ + query = ( + "CREATE KEYSPACE IF NOT EXISTS " + + keyspace + + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };" + ) + return query diff --git a/src/sink/DefaultSparkSink.py b/src/sink/DefaultSparkSink.py index 30f48f7..a66b6ee 100644 --- a/src/sink/DefaultSparkSink.py +++ b/src/sink/DefaultSparkSink.py @@ -1,65 +1,65 @@ -# ================================================================================== -# -# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ================================================================================== - -""" -@ModuleName: Sink Default Class -""" -from sink.Base import Sink -class DefaultSparkSink(Sink): - """ - @Class: Default Spark Sink - """ - def __init__(self, classflavour): - """ - @Methond: Constructor - @Input : classflavor - """ - self.ClassType="Default" - self.flavour = classflavour - self.logger = None - classconfig = None - sparkconfig = None - self.sparkloadkey = None - self.writetype = None - def init(self, sparkhelper, confighelper, inputdict): - """ - @Methond:init - @Inputs: sparkhelper - confighelper - inputdict - """ - self.logger= confighelper.getLogger() - classconfig,sparkconfig = confighelper.getClassConfig(self) - self.sparkloadkey=classconfig["SparkLoadKey"] - self.writetype=classconfig["WriteMode"] - for key in sparkconfig: - value = sparkconfig[key] - print("Spark Config",key,sparkconfig[key]) - if value.startswith('$Input$'): - inputkey = value[7:] - sparkhelper.addConf(key,inputdict[inputkey]) - else: - sparkhelper.addConf(key,value) - def write(self, sparksession, sparkdf): - """ - @Method: write - @input : sparksession - sparkdf - """ - sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save() - self.logger.debug("Data written to Sink"+self.flavour) +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +@ModuleName: Sink Default Class +""" +from sink.Base import Sink +class DefaultSparkSink(Sink): + """ + @Class: Default Spark Sink + """ + def __init__(self, classflavour): + """ + @Methond: Constructor + @Input : classflavor + """ + self.ClassType="Default" + self.flavour = classflavour + self.logger = None + classconfig = None + sparkconfig = None + self.sparkloadkey = None + self.writetype = None + def init(self, sparkhelper, confighelper, inputdict): + """ + @Methond:init + @Inputs: sparkhelper + confighelper + inputdict + """ + self.logger= confighelper.getLogger() + classconfig,sparkconfig = confighelper.getClassConfig(self) + self.sparkloadkey=classconfig["SparkLoadKey"] + self.writetype=classconfig["WriteMode"] + for key in sparkconfig: + value = sparkconfig[key] + print("Spark Config",key,sparkconfig[key]) + if value.startswith('$Input$'): + inputkey = value[7:] + sparkhelper.addConf(key,inputdict[inputkey]) + else: + sparkhelper.addConf(key,value) + def write(self, sparksession, sparkdf): + """ + @Method: write + @input : sparksession + sparkdf + """ + sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save() + self.logger.debug("Data written to Sink"+self.flavour) -- 2.16.6