Adding files for sink funtionality for data extraction module 50/9250/1
authorSANDEEP KUMAR JAISAWAL <s.jaisawal@samsung.com>
Thu, 13 Oct 2022 05:42:26 +0000 (11:12 +0530)
committerSANDEEP KUMAR JAISAWAL <s.jaisawal@samsung.com>
Thu, 13 Oct 2022 05:52:46 +0000 (11:22 +0530)
Issue-Id: AIMLFW-2

Signed-off-by: SANDEEP KUMAR JAISAWAL <s.jaisawal@samsung.com>
Change-Id: Ia4b9c1af3878f9b5eae79528610ee5c65d88108f

src/sink/CassandraSink.py [new file with mode: 0644]
src/sink/DefaultSparkSink.py [new file with mode: 0644]
src/sink/SinkClassConfig.ini [new file with mode: 0644]
src/sink/__init__.py [new file with mode: 0644]

diff --git a/src/sink/CassandraSink.py b/src/sink/CassandraSink.py
new file mode 100644 (file)
index 0000000..e095d67
--- /dev/null
@@ -0,0 +1,185 @@
+# ==================================================================================\r
+#\r
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.\r
+#\r
+#   Licensed under the Apache License, Version 2.0 (the "License");\r
+#   you may not use this file except in compliance with the License.\r
+#   You may obtain a copy of the License at\r
+#\r
+#          http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#   Unless required by applicable law or agreed to in writing, software\r
+#   distributed under the License is distributed on an "AS IS" BASIS,\r
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#   See the License for the specific language governing permissions and\r
+#   limitations under the License.\r
+#\r
+# ==================================================================================\r
+\r
+"""\r
+@ModuleName: SInk Default Class\r
+"""\r
+from sink.Base import Sink\r
+from pyspark import SparkContext, SparkConf\r
+from pyspark.sql import SQLContext, SparkSession\r
+from pyspark.sql.types import *\r
+from cassandra.cluster import Cluster\r
+from cassandra.auth import PlainTextAuthProvider\r
+import time\r
+import json\r
+from pyspark.sql.functions import monotonically_increasing_id \r
+\r
+\r
+class CassandraSink(Sink):\r
+    """\r
+        Writes the queried data in to cassandra table.\r
+        It creates the table with given trainingjob name\r
+        It creates Keyspaces if doesn't exist.\r
+    @Class: Default Spark Sink\r
+    """\r
+\r
+    def __init__(self, classflavour):\r
+        """\r
+        @Methond: Constructor\r
+        @Input : classflavor\r
+        """\r
+        self.ClassType = "Custom"\r
+        self.flavour = classflavour\r
+        self.logger = None\r
+        classconfig = None\r
+        sparkconfig = None\r
+        self.sparkloadkey = None\r
+        self.writetype = None\r
+        self.tableName = None\r
+        self.keyspace = None\r
+        self.user = None\r
+        self.passw = None\r
+        self.host = None\r
+        self.port = None\r
+\r
+    def init(self, sparkhelper, confighelper, inputdict):\r
+        """\r
+        @Methond:init\r
+        @Inputs: sparkhelper\r
+                 confighelper\r
+                 inputdict\r
+        """\r
+        self.logger = confighelper.getLogger()\r
+\r
+        self.tableName = inputdict["CollectionName"]\r
+        classconfig, sparkconfig = confighelper.getClassConfig(self)\r
+        envConf = confighelper.getEnvConfig()\r
+\r
+        self.keyspace = envConf["cassandra_sinkdb"]\r
+        self.host = envConf["fs_db_ip"]\r
+        self.port = envConf["fs_db_port"]\r
+        self.user = envConf["fs_db_user"]\r
+        self.passw = envConf["fs_db_password"]\r
+\r
+        self.sparkloadkey = classconfig["SparkLoadKey"]\r
+        self.writetype = classconfig["WriteMode"]\r
+\r
+        self.logger.debug(\r
+            "Sink keyspace-" + str(self.keyspace) + " table-" + str(self.tableName)\r
+        )\r
+\r
+        for key in sparkconfig:\r
+            value = sparkconfig[key]\r
+            print("Spark Config", key, sparkconfig[key])\r
+            if value.startswith("$Input$"):\r
+                inputkey = value[7:]\r
+                sparkhelper.addConf(key, inputdict[inputkey])\r
+            else:\r
+                sparkhelper.addConf(key, value)\r
+\r
+    def write(self, sparksession, sparkdf):\r
+        """\r
+        @Method: write the data from sparkdf in to cassandra table\r
+        @input : sparksession\r
+                 sparkdf\r
+        """\r
+        self.logger.debug("Data writing to Sink " + self.flavour)\r
+        self.create_table(sparkdf)\r
+        sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id())\r
+        \r
+        \r
+        \r
+        write_options = {\r
+            "table": self.tableName,\r
+            "keyspace": self.keyspace,\r
+            "spark.cassandra.connection.host": self.host,\r
+            "spark.cassandra.connection.port": self.port,\r
+            "spark.cassandra.auth.username": self.user,\r
+            "spark.cassandra.auth.password": self.passw,\r
+        }\r
+\r
+        sparkdf.write.format(self.sparkloadkey).options(**write_options).mode(\r
+            "append"\r
+        ).save()\r
+\r
+        self.logger.debug(\r
+            "*** Data written to Sink *** " + self.keyspace + "." + self.tableName\r
+        )\r
+        \r
+\r
+\r
+    def create_table(self, sparkdf):\r
+        """\r
+        Recreates table in cassandra db as per trainingjob name\r
+        Creates Keyspaces , if doesn't exist\r
+        """\r
+        self.logger.debug("Creating table...")\r
+\r
+        auth_provider = PlainTextAuthProvider(username=self.user, password=self.passw)\r
+        cluster = Cluster([self.host], port=self.port, auth_provider=auth_provider)\r
+\r
+        session_default = cluster.connect()\r
+        create_ks_query = self.buildKeyspaceQuery(self.keyspace)\r
+        session_default.execute(create_ks_query)\r
+\r
+        session = cluster.connect(self.keyspace)\r
+        \r
+        \r
+        session.execute(self.buildDeleteTable(sparkdf))\r
+        \r
+        query = self.buildCreateTable(sparkdf)\r
+        session.execute(query)\r
+        time.sleep(3)\r
+        cluster.shutdown()\r
+\r
+    def buildCreateTable(self, sparkdf):\r
+        """\r
+        Builds simple cassandra query for creating table.\r
+        Columns names as per sparkdf headers, choosing first header __Id in sparkdf\r
+        as primary key for table\r
+        """\r
+        query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT,"\r
+        if sparkdf is not None:\r
+            col_list = sparkdf.schema.names\r
+            # To maintain the column name case sensitivity\r
+            for col in col_list:\r
+                query = query + ' "' + str(col) + '"' + " text " + ","\r
+\r
+        # Creating __Id column as Primary Key\r
+        query = query + "PRIMARY KEY(" + '"__Id"' + "));"\r
+        self.logger.debug("Create table query " + query)\r
+        return query\r
+\r
+    def buildDeleteTable(self, sparkdf):\r
+        """\r
+        Builds simple cassandra query for deleting table\r
+        """\r
+        query = "DROP TABLE IF EXISTS  " + self.tableName + " ;"\r
+        self.logger.debug("Delete table query " + query)\r
+        return query\r
+\r
+    def buildKeyspaceQuery(self, keyspace):\r
+        """\r
+        Builds cassandra query for creating keyspace\r
+        """\r
+        query = (\r
+            "CREATE KEYSPACE IF NOT EXISTS "\r
+            + keyspace\r
+            + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor'  : 2 };"\r
+        )\r
+        return query\r
diff --git a/src/sink/DefaultSparkSink.py b/src/sink/DefaultSparkSink.py
new file mode 100644 (file)
index 0000000..30f48f7
--- /dev/null
@@ -0,0 +1,65 @@
+# ==================================================================================\r
+#\r
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.\r
+#\r
+#   Licensed under the Apache License, Version 2.0 (the "License");\r
+#   you may not use this file except in compliance with the License.\r
+#   You may obtain a copy of the License at\r
+#\r
+#          http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#   Unless required by applicable law or agreed to in writing, software\r
+#   distributed under the License is distributed on an "AS IS" BASIS,\r
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#   See the License for the specific language governing permissions and\r
+#   limitations under the License.\r
+#\r
+# ==================================================================================\r
+\r
+"""\r
+@ModuleName: Sink Default Class\r
+"""\r
+from sink.Base import Sink\r
+class DefaultSparkSink(Sink):\r
+    """\r
+    @Class: Default Spark Sink\r
+    """\r
+    def __init__(self, classflavour):\r
+        """\r
+        @Methond: Constructor\r
+        @Input : classflavor\r
+        """\r
+        self.ClassType="Default"\r
+        self.flavour = classflavour\r
+        self.logger = None\r
+        classconfig = None\r
+        sparkconfig  = None\r
+        self.sparkloadkey = None\r
+        self.writetype = None\r
+    def init(self, sparkhelper, confighelper, inputdict):\r
+        """\r
+        @Methond:init\r
+        @Inputs: sparkhelper\r
+                 confighelper\r
+                 inputdict\r
+        """\r
+        self.logger= confighelper.getLogger()\r
+        classconfig,sparkconfig  = confighelper.getClassConfig(self)\r
+        self.sparkloadkey=classconfig["SparkLoadKey"]\r
+        self.writetype=classconfig["WriteMode"]\r
+        for key in sparkconfig:\r
+            value = sparkconfig[key]\r
+            print("Spark Config",key,sparkconfig[key])\r
+            if value.startswith('$Input$'):\r
+                inputkey = value[7:]\r
+                sparkhelper.addConf(key,inputdict[inputkey])\r
+            else:\r
+                sparkhelper.addConf(key,value)\r
+    def write(self, sparksession, sparkdf):\r
+        """\r
+        @Method: write\r
+        @input : sparksession\r
+                 sparkdf\r
+        """\r
+        sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save()\r
+        self.logger.debug("Data written to Sink"+self.flavour)\r
diff --git a/src/sink/SinkClassConfig.ini b/src/sink/SinkClassConfig.ini
new file mode 100644 (file)
index 0000000..7a12e9a
--- /dev/null
@@ -0,0 +1,36 @@
+[ModuleDetails]
+ModuleName=sink
+BaseClassName=Sink
+DefaultClassName=DefaultSparkSink
+
+[CassandraSink]
+Name=CassandraDB
+Description= Spark connector that Supports Writing from CassandraDB
+ClassType=Custom
+SparkLoadKey=org.apache.spark.sql.cassandra
+WriteMode=overwrite
+
+[CassandraSink.SparkConf]
+spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel}
+spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1
+
+[CassandraSource.Inputs]
+
+[EnvConfig]
+FS_DB_USER=FS_DB_USER
+FS_DB_PASSWORD=FS_DB_PASSWORD
+FS_DB_IP=FS_DB_IP
+FS_DB_PORT=FS_DB_PORT
+FS_DB_NAME=FS_DB_KEYSPACE_NAME
+
+
+Cassandra_SinkIP=FS_DB_IP
+Cassandra_SinkPort=FS_DB_PORT
+Cassandra_SinkUser=FS_DB_USER
+Cassandra_SinkPassword=FS_DB_PASSWORD
+Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL
+
+Cassandra_SinkDB=FS_DB_KEYSPACE_NAME
+
+
+
diff --git a/src/sink/__init__.py b/src/sink/__init__.py
new file mode 100644 (file)
index 0000000..e69de29