-# ==================================================================================\r
-#\r
-# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.\r
-#\r
-# Licensed under the Apache License, Version 2.0 (the "License");\r
-# you may not use this file except in compliance with the License.\r
-# You may obtain a copy of the License at\r
-#\r
-# http://www.apache.org/licenses/LICENSE-2.0\r
-#\r
-# Unless required by applicable law or agreed to in writing, software\r
-# distributed under the License is distributed on an "AS IS" BASIS,\r
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-# See the License for the specific language governing permissions and\r
-# limitations under the License.\r
-#\r
-# ==================================================================================\r
-\r
-"""\r
-@ModuleName: SInk Default Class\r
-"""\r
-from sink.Base import Sink\r
-from pyspark import SparkContext, SparkConf\r
-from pyspark.sql import SQLContext, SparkSession\r
-from pyspark.sql.types import *\r
-from cassandra.cluster import Cluster\r
-from cassandra.auth import PlainTextAuthProvider\r
-import time\r
-import json\r
-from pyspark.sql.functions import monotonically_increasing_id \r
-\r
-\r
-class CassandraSink(Sink):\r
- """\r
- Writes the queried data in to cassandra table.\r
- It creates the table with given trainingjob name\r
- It creates Keyspaces if doesn't exist.\r
- @Class: Default Spark Sink\r
- """\r
-\r
- def __init__(self, classflavour):\r
- """\r
- @Methond: Constructor\r
- @Input : classflavor\r
- """\r
- self.ClassType = "Custom"\r
- self.flavour = classflavour\r
- self.logger = None\r
- classconfig = None\r
- sparkconfig = None\r
- self.sparkloadkey = None\r
- self.writetype = None\r
- self.tableName = None\r
- self.keyspace = None\r
- self.user = None\r
- self.passw = None\r
- self.host = None\r
- self.port = None\r
-\r
- def init(self, sparkhelper, confighelper, inputdict):\r
- """\r
- @Methond:init\r
- @Inputs: sparkhelper\r
- confighelper\r
- inputdict\r
- """\r
- self.logger = confighelper.getLogger()\r
-\r
- self.tableName = inputdict["CollectionName"]\r
- classconfig, sparkconfig = confighelper.getClassConfig(self)\r
- envConf = confighelper.getEnvConfig()\r
-\r
- self.keyspace = envConf["cassandra_sinkdb"]\r
- self.host = envConf["fs_db_ip"]\r
- self.port = envConf["fs_db_port"]\r
- self.user = envConf["fs_db_user"]\r
- self.passw = envConf["fs_db_password"]\r
-\r
- self.sparkloadkey = classconfig["SparkLoadKey"]\r
- self.writetype = classconfig["WriteMode"]\r
-\r
- self.logger.debug(\r
- "Sink keyspace-" + str(self.keyspace) + " table-" + str(self.tableName)\r
- )\r
-\r
- for key in sparkconfig:\r
- value = sparkconfig[key]\r
- print("Spark Config", key, sparkconfig[key])\r
- if value.startswith("$Input$"):\r
- inputkey = value[7:]\r
- sparkhelper.addConf(key, inputdict[inputkey])\r
- else:\r
- sparkhelper.addConf(key, value)\r
-\r
- def write(self, sparksession, sparkdf):\r
- """\r
- @Method: write the data from sparkdf in to cassandra table\r
- @input : sparksession\r
- sparkdf\r
- """\r
- self.logger.debug("Data writing to Sink " + self.flavour)\r
- self.create_table(sparkdf)\r
- sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id())\r
- \r
- \r
- \r
- write_options = {\r
- "table": self.tableName,\r
- "keyspace": self.keyspace,\r
- "spark.cassandra.connection.host": self.host,\r
- "spark.cassandra.connection.port": self.port,\r
- "spark.cassandra.auth.username": self.user,\r
- "spark.cassandra.auth.password": self.passw,\r
- }\r
-\r
- sparkdf.write.format(self.sparkloadkey).options(**write_options).mode(\r
- "append"\r
- ).save()\r
-\r
- self.logger.debug(\r
- "*** Data written to Sink *** " + self.keyspace + "." + self.tableName\r
- )\r
- \r
-\r
-\r
- def create_table(self, sparkdf):\r
- """\r
- Recreates table in cassandra db as per trainingjob name\r
- Creates Keyspaces , if doesn't exist\r
- """\r
- self.logger.debug("Creating table...")\r
-\r
- auth_provider = PlainTextAuthProvider(username=self.user, password=self.passw)\r
- cluster = Cluster([self.host], port=self.port, auth_provider=auth_provider)\r
-\r
- session_default = cluster.connect()\r
- create_ks_query = self.buildKeyspaceQuery(self.keyspace)\r
- session_default.execute(create_ks_query)\r
-\r
- session = cluster.connect(self.keyspace)\r
- \r
- \r
- session.execute(self.buildDeleteTable(sparkdf))\r
- \r
- query = self.buildCreateTable(sparkdf)\r
- session.execute(query)\r
- time.sleep(3)\r
- cluster.shutdown()\r
-\r
- def buildCreateTable(self, sparkdf):\r
- """\r
- Builds simple cassandra query for creating table.\r
- Columns names as per sparkdf headers, choosing first header __Id in sparkdf\r
- as primary key for table\r
- """\r
- query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT,"\r
- if sparkdf is not None:\r
- col_list = sparkdf.schema.names\r
- # To maintain the column name case sensitivity\r
- for col in col_list:\r
- query = query + ' "' + str(col) + '"' + " text " + ","\r
-\r
- # Creating __Id column as Primary Key\r
- query = query + "PRIMARY KEY(" + '"__Id"' + "));"\r
- self.logger.debug("Create table query " + query)\r
- return query\r
-\r
- def buildDeleteTable(self, sparkdf):\r
- """\r
- Builds simple cassandra query for deleting table\r
- """\r
- query = "DROP TABLE IF EXISTS " + self.tableName + " ;"\r
- self.logger.debug("Delete table query " + query)\r
- return query\r
-\r
- def buildKeyspaceQuery(self, keyspace):\r
- """\r
- Builds cassandra query for creating keyspace\r
- """\r
- query = (\r
- "CREATE KEYSPACE IF NOT EXISTS "\r
- + keyspace\r
- + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };"\r
- )\r
- return query\r
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+"""
+@ModuleName: SInk Default Class
+"""
+from sink.Base import Sink
+from pyspark import SparkContext, SparkConf
+from pyspark.sql import SQLContext, SparkSession
+from pyspark.sql.types import *
+from cassandra.cluster import Cluster
+from cassandra.auth import PlainTextAuthProvider
+import time
+import json
+from pyspark.sql.functions import monotonically_increasing_id
+
+
+class CassandraSink(Sink):
+ """
+ Writes the queried data in to cassandra table.
+ It creates the table with given trainingjob name
+ It creates Keyspaces if doesn't exist.
+ @Class: Default Spark Sink
+ """
+
+ def __init__(self, classflavour):
+ """
+ @Methond: Constructor
+ @Input : classflavor
+ """
+ self.ClassType = "Custom"
+ self.flavour = classflavour
+ self.logger = None
+ classconfig = None
+ sparkconfig = None
+ self.sparkloadkey = None
+ self.writetype = None
+ self.tableName = None
+ self.keyspace = None
+ self.user = None
+ self.passw = None
+ self.host = None
+ self.port = None
+
+ def init(self, sparkhelper, confighelper, inputdict):
+ """
+ @Methond:init
+ @Inputs: sparkhelper
+ confighelper
+ inputdict
+ """
+ self.logger = confighelper.getLogger()
+
+ self.tableName = inputdict["CollectionName"]
+ classconfig, sparkconfig = confighelper.getClassConfig(self)
+ envConf = confighelper.getEnvConfig()
+
+ self.keyspace = envConf["cassandra_sinkdb"]
+ self.host = envConf["fs_db_ip"]
+ self.port = envConf["fs_db_port"]
+ self.user = envConf["fs_db_user"]
+ self.passw = envConf["fs_db_password"]
+
+ self.sparkloadkey = classconfig["SparkLoadKey"]
+ self.writetype = classconfig["WriteMode"]
+
+ self.logger.debug(
+ "Sink keyspace-" + str(self.keyspace) + " table-" + str(self.tableName)
+ )
+
+ for key in sparkconfig:
+ value = sparkconfig[key]
+ print("Spark Config", key, sparkconfig[key])
+ if value.startswith("$Input$"):
+ inputkey = value[7:]
+ sparkhelper.addConf(key, inputdict[inputkey])
+ else:
+ sparkhelper.addConf(key, value)
+
+ def write(self, sparksession, sparkdf):
+ """
+ @Method: write the data from sparkdf in to cassandra table
+ @input : sparksession
+ sparkdf
+ """
+ self.logger.debug("Data writing to Sink " + self.flavour)
+ self.create_table(sparkdf)
+ sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id())
+
+
+
+ write_options = {
+ "table": self.tableName,
+ "keyspace": self.keyspace,
+ "spark.cassandra.connection.host": self.host,
+ "spark.cassandra.connection.port": self.port,
+ "spark.cassandra.auth.username": self.user,
+ "spark.cassandra.auth.password": self.passw,
+ }
+
+ sparkdf.write.format(self.sparkloadkey).options(**write_options).mode(
+ "append"
+ ).save()
+
+ self.logger.debug(
+ "*** Data written to Sink *** " + self.keyspace + "." + self.tableName
+ )
+
+
+
+ def create_table(self, sparkdf):
+ """
+ Recreates table in cassandra db as per trainingjob name
+ Creates Keyspaces , if doesn't exist
+ """
+ self.logger.debug("Creating table...")
+
+ auth_provider = PlainTextAuthProvider(username=self.user, password=self.passw)
+ cluster = Cluster([self.host], port=self.port, auth_provider=auth_provider)
+
+ session_default = cluster.connect()
+ create_ks_query = self.buildKeyspaceQuery(self.keyspace)
+ session_default.execute(create_ks_query)
+
+ session = cluster.connect(self.keyspace)
+
+
+ session.execute(self.buildDeleteTable(sparkdf))
+
+ query = self.buildCreateTable(sparkdf)
+ session.execute(query)
+ time.sleep(3)
+ cluster.shutdown()
+
+ def buildCreateTable(self, sparkdf):
+ """
+ Builds simple cassandra query for creating table.
+ Columns names as per sparkdf headers, choosing first header __Id in sparkdf
+ as primary key for table
+ """
+ query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT,"
+ if sparkdf is not None:
+ col_list = sparkdf.schema.names
+ # To maintain the column name case sensitivity
+ for col in col_list:
+ query = query + ' "' + str(col) + '"' + " text " + ","
+
+ # Creating __Id column as Primary Key
+ query = query + "PRIMARY KEY(" + '"__Id"' + "));"
+ self.logger.debug("Create table query " + query)
+ return query
+
+ def buildDeleteTable(self, sparkdf):
+ """
+ Builds simple cassandra query for deleting table
+ """
+ query = "DROP TABLE IF EXISTS " + self.tableName + " ;"
+ self.logger.debug("Delete table query " + query)
+ return query
+
+ def buildKeyspaceQuery(self, keyspace):
+ """
+ Builds cassandra query for creating keyspace
+ """
+ query = (
+ "CREATE KEYSPACE IF NOT EXISTS "
+ + keyspace
+ + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };"
+ )
+ return query
-# ==================================================================================\r
-#\r
-# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.\r
-#\r
-# Licensed under the Apache License, Version 2.0 (the "License");\r
-# you may not use this file except in compliance with the License.\r
-# You may obtain a copy of the License at\r
-#\r
-# http://www.apache.org/licenses/LICENSE-2.0\r
-#\r
-# Unless required by applicable law or agreed to in writing, software\r
-# distributed under the License is distributed on an "AS IS" BASIS,\r
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-# See the License for the specific language governing permissions and\r
-# limitations under the License.\r
-#\r
-# ==================================================================================\r
-\r
-"""\r
-@ModuleName: Sink Default Class\r
-"""\r
-from sink.Base import Sink\r
-class DefaultSparkSink(Sink):\r
- """\r
- @Class: Default Spark Sink\r
- """\r
- def __init__(self, classflavour):\r
- """\r
- @Methond: Constructor\r
- @Input : classflavor\r
- """\r
- self.ClassType="Default"\r
- self.flavour = classflavour\r
- self.logger = None\r
- classconfig = None\r
- sparkconfig = None\r
- self.sparkloadkey = None\r
- self.writetype = None\r
- def init(self, sparkhelper, confighelper, inputdict):\r
- """\r
- @Methond:init\r
- @Inputs: sparkhelper\r
- confighelper\r
- inputdict\r
- """\r
- self.logger= confighelper.getLogger()\r
- classconfig,sparkconfig = confighelper.getClassConfig(self)\r
- self.sparkloadkey=classconfig["SparkLoadKey"]\r
- self.writetype=classconfig["WriteMode"]\r
- for key in sparkconfig:\r
- value = sparkconfig[key]\r
- print("Spark Config",key,sparkconfig[key])\r
- if value.startswith('$Input$'):\r
- inputkey = value[7:]\r
- sparkhelper.addConf(key,inputdict[inputkey])\r
- else:\r
- sparkhelper.addConf(key,value)\r
- def write(self, sparksession, sparkdf):\r
- """\r
- @Method: write\r
- @input : sparksession\r
- sparkdf\r
- """\r
- sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save()\r
- self.logger.debug("Data written to Sink"+self.flavour)\r
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+"""
+@ModuleName: Sink Default Class
+"""
+from sink.Base import Sink
+class DefaultSparkSink(Sink):
+ """
+ @Class: Default Spark Sink
+ """
+ def __init__(self, classflavour):
+ """
+ @Methond: Constructor
+ @Input : classflavor
+ """
+ self.ClassType="Default"
+ self.flavour = classflavour
+ self.logger = None
+ classconfig = None
+ sparkconfig = None
+ self.sparkloadkey = None
+ self.writetype = None
+ def init(self, sparkhelper, confighelper, inputdict):
+ """
+ @Methond:init
+ @Inputs: sparkhelper
+ confighelper
+ inputdict
+ """
+ self.logger= confighelper.getLogger()
+ classconfig,sparkconfig = confighelper.getClassConfig(self)
+ self.sparkloadkey=classconfig["SparkLoadKey"]
+ self.writetype=classconfig["WriteMode"]
+ for key in sparkconfig:
+ value = sparkconfig[key]
+ print("Spark Config",key,sparkconfig[key])
+ if value.startswith('$Input$'):
+ inputkey = value[7:]
+ sparkhelper.addConf(key,inputdict[inputkey])
+ else:
+ sparkhelper.addConf(key,value)
+ def write(self, sparksession, sparkdf):
+ """
+ @Method: write
+ @input : sparksession
+ sparkdf
+ """
+ sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save()
+ self.logger.debug("Data written to Sink"+self.flavour)