--- /dev/null
+# ==================================================================================\r
+#\r
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+#\r
+# ==================================================================================\r
+\r
+"""\r
+@ModuleName: SInk Default Class\r
+"""\r
+from sink.Base import Sink\r
+from pyspark import SparkContext, SparkConf\r
+from pyspark.sql import SQLContext, SparkSession\r
+from pyspark.sql.types import *\r
+from cassandra.cluster import Cluster\r
+from cassandra.auth import PlainTextAuthProvider\r
+import time\r
+import json\r
+from pyspark.sql.functions import monotonically_increasing_id \r
+\r
+\r
+class CassandraSink(Sink):\r
+ """\r
+ Writes the queried data in to cassandra table.\r
+ It creates the table with given trainingjob name\r
+ It creates Keyspaces if doesn't exist.\r
+ @Class: Default Spark Sink\r
+ """\r
+\r
+ def __init__(self, classflavour):\r
+ """\r
+ @Methond: Constructor\r
+ @Input : classflavor\r
+ """\r
+ self.ClassType = "Custom"\r
+ self.flavour = classflavour\r
+ self.logger = None\r
+ classconfig = None\r
+ sparkconfig = None\r
+ self.sparkloadkey = None\r
+ self.writetype = None\r
+ self.tableName = None\r
+ self.keyspace = None\r
+ self.user = None\r
+ self.passw = None\r
+ self.host = None\r
+ self.port = None\r
+\r
+ def init(self, sparkhelper, confighelper, inputdict):\r
+ """\r
+ @Methond:init\r
+ @Inputs: sparkhelper\r
+ confighelper\r
+ inputdict\r
+ """\r
+ self.logger = confighelper.getLogger()\r
+\r
+ self.tableName = inputdict["CollectionName"]\r
+ classconfig, sparkconfig = confighelper.getClassConfig(self)\r
+ envConf = confighelper.getEnvConfig()\r
+\r
+ self.keyspace = envConf["cassandra_sinkdb"]\r
+ self.host = envConf["fs_db_ip"]\r
+ self.port = envConf["fs_db_port"]\r
+ self.user = envConf["fs_db_user"]\r
+ self.passw = envConf["fs_db_password"]\r
+\r
+ self.sparkloadkey = classconfig["SparkLoadKey"]\r
+ self.writetype = classconfig["WriteMode"]\r
+\r
+ self.logger.debug(\r
+ "Sink keyspace-" + str(self.keyspace) + " table-" + str(self.tableName)\r
+ )\r
+\r
+ for key in sparkconfig:\r
+ value = sparkconfig[key]\r
+ print("Spark Config", key, sparkconfig[key])\r
+ if value.startswith("$Input$"):\r
+ inputkey = value[7:]\r
+ sparkhelper.addConf(key, inputdict[inputkey])\r
+ else:\r
+ sparkhelper.addConf(key, value)\r
+\r
+ def write(self, sparksession, sparkdf):\r
+ """\r
+ @Method: write the data from sparkdf in to cassandra table\r
+ @input : sparksession\r
+ sparkdf\r
+ """\r
+ self.logger.debug("Data writing to Sink " + self.flavour)\r
+ self.create_table(sparkdf)\r
+ sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id())\r
+ \r
+ \r
+ \r
+ write_options = {\r
+ "table": self.tableName,\r
+ "keyspace": self.keyspace,\r
+ "spark.cassandra.connection.host": self.host,\r
+ "spark.cassandra.connection.port": self.port,\r
+ "spark.cassandra.auth.username": self.user,\r
+ "spark.cassandra.auth.password": self.passw,\r
+ }\r
+\r
+ sparkdf.write.format(self.sparkloadkey).options(**write_options).mode(\r
+ "append"\r
+ ).save()\r
+\r
+ self.logger.debug(\r
+ "*** Data written to Sink *** " + self.keyspace + "." + self.tableName\r
+ )\r
+ \r
+\r
+\r
+ def create_table(self, sparkdf):\r
+ """\r
+ Recreates table in cassandra db as per trainingjob name\r
+ Creates Keyspaces , if doesn't exist\r
+ """\r
+ self.logger.debug("Creating table...")\r
+\r
+ auth_provider = PlainTextAuthProvider(username=self.user, password=self.passw)\r
+ cluster = Cluster([self.host], port=self.port, auth_provider=auth_provider)\r
+\r
+ session_default = cluster.connect()\r
+ create_ks_query = self.buildKeyspaceQuery(self.keyspace)\r
+ session_default.execute(create_ks_query)\r
+\r
+ session = cluster.connect(self.keyspace)\r
+ \r
+ \r
+ session.execute(self.buildDeleteTable(sparkdf))\r
+ \r
+ query = self.buildCreateTable(sparkdf)\r
+ session.execute(query)\r
+ time.sleep(3)\r
+ cluster.shutdown()\r
+\r
+ def buildCreateTable(self, sparkdf):\r
+ """\r
+ Builds simple cassandra query for creating table.\r
+ Columns names as per sparkdf headers, choosing first header __Id in sparkdf\r
+ as primary key for table\r
+ """\r
+ query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT,"\r
+ if sparkdf is not None:\r
+ col_list = sparkdf.schema.names\r
+ # To maintain the column name case sensitivity\r
+ for col in col_list:\r
+ query = query + ' "' + str(col) + '"' + " text " + ","\r
+\r
+ # Creating __Id column as Primary Key\r
+ query = query + "PRIMARY KEY(" + '"__Id"' + "));"\r
+ self.logger.debug("Create table query " + query)\r
+ return query\r
+\r
+ def buildDeleteTable(self, sparkdf):\r
+ """\r
+ Builds simple cassandra query for deleting table\r
+ """\r
+ query = "DROP TABLE IF EXISTS " + self.tableName + " ;"\r
+ self.logger.debug("Delete table query " + query)\r
+ return query\r
+\r
+ def buildKeyspaceQuery(self, keyspace):\r
+ """\r
+ Builds cassandra query for creating keyspace\r
+ """\r
+ query = (\r
+ "CREATE KEYSPACE IF NOT EXISTS "\r
+ + keyspace\r
+ + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };"\r
+ )\r
+ return query\r
--- /dev/null
+# ==================================================================================\r
+#\r
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+#\r
+# ==================================================================================\r
+\r
+"""\r
+@ModuleName: Sink Default Class\r
+"""\r
+from sink.Base import Sink\r
+class DefaultSparkSink(Sink):\r
+ """\r
+ @Class: Default Spark Sink\r
+ """\r
+ def __init__(self, classflavour):\r
+ """\r
+ @Methond: Constructor\r
+ @Input : classflavor\r
+ """\r
+ self.ClassType="Default"\r
+ self.flavour = classflavour\r
+ self.logger = None\r
+ classconfig = None\r
+ sparkconfig = None\r
+ self.sparkloadkey = None\r
+ self.writetype = None\r
+ def init(self, sparkhelper, confighelper, inputdict):\r
+ """\r
+ @Methond:init\r
+ @Inputs: sparkhelper\r
+ confighelper\r
+ inputdict\r
+ """\r
+ self.logger= confighelper.getLogger()\r
+ classconfig,sparkconfig = confighelper.getClassConfig(self)\r
+ self.sparkloadkey=classconfig["SparkLoadKey"]\r
+ self.writetype=classconfig["WriteMode"]\r
+ for key in sparkconfig:\r
+ value = sparkconfig[key]\r
+ print("Spark Config",key,sparkconfig[key])\r
+ if value.startswith('$Input$'):\r
+ inputkey = value[7:]\r
+ sparkhelper.addConf(key,inputdict[inputkey])\r
+ else:\r
+ sparkhelper.addConf(key,value)\r
+ def write(self, sparksession, sparkdf):\r
+ """\r
+ @Method: write\r
+ @input : sparksession\r
+ sparkdf\r
+ """\r
+ sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save()\r
+ self.logger.debug("Data written to Sink"+self.flavour)\r