Removing CRLF from few files 51/9251/1
authorSANDEEP KUMAR JAISAWAL <s.jaisawal@samsung.com>
Thu, 13 Oct 2022 07:10:51 +0000 (12:40 +0530)
committerSANDEEP KUMAR JAISAWAL <s.jaisawal@samsung.com>
Thu, 13 Oct 2022 07:13:24 +0000 (12:43 +0530)
Issue-Id: AIMLFW-2

Signed-off-by: SANDEEP KUMAR JAISAWAL <s.jaisawal@samsung.com>
Change-Id: I2ba4f43e29f80408d17ce188a6e68c26e3799f15

src/sink/CassandraSink.py
src/sink/DefaultSparkSink.py

index e095d67..736916e 100644 (file)
-# ==================================================================================\r
-#\r
-#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.\r
-#\r
-#   Licensed under the Apache License, Version 2.0 (the "License");\r
-#   you may not use this file except in compliance with the License.\r
-#   You may obtain a copy of the License at\r
-#\r
-#          http://www.apache.org/licenses/LICENSE-2.0\r
-#\r
-#   Unless required by applicable law or agreed to in writing, software\r
-#   distributed under the License is distributed on an "AS IS" BASIS,\r
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-#   See the License for the specific language governing permissions and\r
-#   limitations under the License.\r
-#\r
-# ==================================================================================\r
-\r
-"""\r
-@ModuleName: SInk Default Class\r
-"""\r
-from sink.Base import Sink\r
-from pyspark import SparkContext, SparkConf\r
-from pyspark.sql import SQLContext, SparkSession\r
-from pyspark.sql.types import *\r
-from cassandra.cluster import Cluster\r
-from cassandra.auth import PlainTextAuthProvider\r
-import time\r
-import json\r
-from pyspark.sql.functions import monotonically_increasing_id \r
-\r
-\r
-class CassandraSink(Sink):\r
-    """\r
-        Writes the queried data in to cassandra table.\r
-        It creates the table with given trainingjob name\r
-        It creates Keyspaces if doesn't exist.\r
-    @Class: Default Spark Sink\r
-    """\r
-\r
-    def __init__(self, classflavour):\r
-        """\r
-        @Methond: Constructor\r
-        @Input : classflavor\r
-        """\r
-        self.ClassType = "Custom"\r
-        self.flavour = classflavour\r
-        self.logger = None\r
-        classconfig = None\r
-        sparkconfig = None\r
-        self.sparkloadkey = None\r
-        self.writetype = None\r
-        self.tableName = None\r
-        self.keyspace = None\r
-        self.user = None\r
-        self.passw = None\r
-        self.host = None\r
-        self.port = None\r
-\r
-    def init(self, sparkhelper, confighelper, inputdict):\r
-        """\r
-        @Methond:init\r
-        @Inputs: sparkhelper\r
-                 confighelper\r
-                 inputdict\r
-        """\r
-        self.logger = confighelper.getLogger()\r
-\r
-        self.tableName = inputdict["CollectionName"]\r
-        classconfig, sparkconfig = confighelper.getClassConfig(self)\r
-        envConf = confighelper.getEnvConfig()\r
-\r
-        self.keyspace = envConf["cassandra_sinkdb"]\r
-        self.host = envConf["fs_db_ip"]\r
-        self.port = envConf["fs_db_port"]\r
-        self.user = envConf["fs_db_user"]\r
-        self.passw = envConf["fs_db_password"]\r
-\r
-        self.sparkloadkey = classconfig["SparkLoadKey"]\r
-        self.writetype = classconfig["WriteMode"]\r
-\r
-        self.logger.debug(\r
-            "Sink keyspace-" + str(self.keyspace) + " table-" + str(self.tableName)\r
-        )\r
-\r
-        for key in sparkconfig:\r
-            value = sparkconfig[key]\r
-            print("Spark Config", key, sparkconfig[key])\r
-            if value.startswith("$Input$"):\r
-                inputkey = value[7:]\r
-                sparkhelper.addConf(key, inputdict[inputkey])\r
-            else:\r
-                sparkhelper.addConf(key, value)\r
-\r
-    def write(self, sparksession, sparkdf):\r
-        """\r
-        @Method: write the data from sparkdf in to cassandra table\r
-        @input : sparksession\r
-                 sparkdf\r
-        """\r
-        self.logger.debug("Data writing to Sink " + self.flavour)\r
-        self.create_table(sparkdf)\r
-        sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id())\r
-        \r
-        \r
-        \r
-        write_options = {\r
-            "table": self.tableName,\r
-            "keyspace": self.keyspace,\r
-            "spark.cassandra.connection.host": self.host,\r
-            "spark.cassandra.connection.port": self.port,\r
-            "spark.cassandra.auth.username": self.user,\r
-            "spark.cassandra.auth.password": self.passw,\r
-        }\r
-\r
-        sparkdf.write.format(self.sparkloadkey).options(**write_options).mode(\r
-            "append"\r
-        ).save()\r
-\r
-        self.logger.debug(\r
-            "*** Data written to Sink *** " + self.keyspace + "." + self.tableName\r
-        )\r
-        \r
-\r
-\r
-    def create_table(self, sparkdf):\r
-        """\r
-        Recreates table in cassandra db as per trainingjob name\r
-        Creates Keyspaces , if doesn't exist\r
-        """\r
-        self.logger.debug("Creating table...")\r
-\r
-        auth_provider = PlainTextAuthProvider(username=self.user, password=self.passw)\r
-        cluster = Cluster([self.host], port=self.port, auth_provider=auth_provider)\r
-\r
-        session_default = cluster.connect()\r
-        create_ks_query = self.buildKeyspaceQuery(self.keyspace)\r
-        session_default.execute(create_ks_query)\r
-\r
-        session = cluster.connect(self.keyspace)\r
-        \r
-        \r
-        session.execute(self.buildDeleteTable(sparkdf))\r
-        \r
-        query = self.buildCreateTable(sparkdf)\r
-        session.execute(query)\r
-        time.sleep(3)\r
-        cluster.shutdown()\r
-\r
-    def buildCreateTable(self, sparkdf):\r
-        """\r
-        Builds simple cassandra query for creating table.\r
-        Columns names as per sparkdf headers, choosing first header __Id in sparkdf\r
-        as primary key for table\r
-        """\r
-        query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT,"\r
-        if sparkdf is not None:\r
-            col_list = sparkdf.schema.names\r
-            # To maintain the column name case sensitivity\r
-            for col in col_list:\r
-                query = query + ' "' + str(col) + '"' + " text " + ","\r
-\r
-        # Creating __Id column as Primary Key\r
-        query = query + "PRIMARY KEY(" + '"__Id"' + "));"\r
-        self.logger.debug("Create table query " + query)\r
-        return query\r
-\r
-    def buildDeleteTable(self, sparkdf):\r
-        """\r
-        Builds simple cassandra query for deleting table\r
-        """\r
-        query = "DROP TABLE IF EXISTS  " + self.tableName + " ;"\r
-        self.logger.debug("Delete table query " + query)\r
-        return query\r
-\r
-    def buildKeyspaceQuery(self, keyspace):\r
-        """\r
-        Builds cassandra query for creating keyspace\r
-        """\r
-        query = (\r
-            "CREATE KEYSPACE IF NOT EXISTS "\r
-            + keyspace\r
-            + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor'  : 2 };"\r
-        )\r
-        return query\r
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""
+@ModuleName: SInk Default Class
+"""
+from sink.Base import Sink
+from pyspark import SparkContext, SparkConf
+from pyspark.sql import SQLContext, SparkSession
+from pyspark.sql.types import *
+from cassandra.cluster import Cluster
+from cassandra.auth import PlainTextAuthProvider
+import time
+import json
+from pyspark.sql.functions import monotonically_increasing_id 
+
+
+class CassandraSink(Sink):
+    """
+        Writes the queried data in to cassandra table.
+        It creates the table with given trainingjob name
+        It creates Keyspaces if doesn't exist.
+    @Class: Default Spark Sink
+    """
+
+    def __init__(self, classflavour):
+        """
+        @Methond: Constructor
+        @Input : classflavor
+        """
+        self.ClassType = "Custom"
+        self.flavour = classflavour
+        self.logger = None
+        classconfig = None
+        sparkconfig = None
+        self.sparkloadkey = None
+        self.writetype = None
+        self.tableName = None
+        self.keyspace = None
+        self.user = None
+        self.passw = None
+        self.host = None
+        self.port = None
+
+    def init(self, sparkhelper, confighelper, inputdict):
+        """
+        @Methond:init
+        @Inputs: sparkhelper
+                 confighelper
+                 inputdict
+        """
+        self.logger = confighelper.getLogger()
+
+        self.tableName = inputdict["CollectionName"]
+        classconfig, sparkconfig = confighelper.getClassConfig(self)
+        envConf = confighelper.getEnvConfig()
+
+        self.keyspace = envConf["cassandra_sinkdb"]
+        self.host = envConf["fs_db_ip"]
+        self.port = envConf["fs_db_port"]
+        self.user = envConf["fs_db_user"]
+        self.passw = envConf["fs_db_password"]
+
+        self.sparkloadkey = classconfig["SparkLoadKey"]
+        self.writetype = classconfig["WriteMode"]
+
+        self.logger.debug(
+            "Sink keyspace-" + str(self.keyspace) + " table-" + str(self.tableName)
+        )
+
+        for key in sparkconfig:
+            value = sparkconfig[key]
+            print("Spark Config", key, sparkconfig[key])
+            if value.startswith("$Input$"):
+                inputkey = value[7:]
+                sparkhelper.addConf(key, inputdict[inputkey])
+            else:
+                sparkhelper.addConf(key, value)
+
+    def write(self, sparksession, sparkdf):
+        """
+        @Method: write the data from sparkdf in to cassandra table
+        @input : sparksession
+                 sparkdf
+        """
+        self.logger.debug("Data writing to Sink " + self.flavour)
+        self.create_table(sparkdf)
+        sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id())
+        
+        
+        
+        write_options = {
+            "table": self.tableName,
+            "keyspace": self.keyspace,
+            "spark.cassandra.connection.host": self.host,
+            "spark.cassandra.connection.port": self.port,
+            "spark.cassandra.auth.username": self.user,
+            "spark.cassandra.auth.password": self.passw,
+        }
+
+        sparkdf.write.format(self.sparkloadkey).options(**write_options).mode(
+            "append"
+        ).save()
+
+        self.logger.debug(
+            "*** Data written to Sink *** " + self.keyspace + "." + self.tableName
+        )
+        
+
+
+    def create_table(self, sparkdf):
+        """
+        Recreates table in cassandra db as per trainingjob name
+        Creates Keyspaces , if doesn't exist
+        """
+        self.logger.debug("Creating table...")
+
+        auth_provider = PlainTextAuthProvider(username=self.user, password=self.passw)
+        cluster = Cluster([self.host], port=self.port, auth_provider=auth_provider)
+
+        session_default = cluster.connect()
+        create_ks_query = self.buildKeyspaceQuery(self.keyspace)
+        session_default.execute(create_ks_query)
+
+        session = cluster.connect(self.keyspace)
+        
+        
+        session.execute(self.buildDeleteTable(sparkdf))
+        
+        query = self.buildCreateTable(sparkdf)
+        session.execute(query)
+        time.sleep(3)
+        cluster.shutdown()
+
+    def buildCreateTable(self, sparkdf):
+        """
+        Builds simple cassandra query for creating table.
+        Columns names as per sparkdf headers, choosing first header __Id in sparkdf
+        as primary key for table
+        """
+        query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT,"
+        if sparkdf is not None:
+            col_list = sparkdf.schema.names
+            # To maintain the column name case sensitivity
+            for col in col_list:
+                query = query + ' "' + str(col) + '"' + " text " + ","
+
+        # Creating __Id column as Primary Key
+        query = query + "PRIMARY KEY(" + '"__Id"' + "));"
+        self.logger.debug("Create table query " + query)
+        return query
+
+    def buildDeleteTable(self, sparkdf):
+        """
+        Builds simple cassandra query for deleting table
+        """
+        query = "DROP TABLE IF EXISTS  " + self.tableName + " ;"
+        self.logger.debug("Delete table query " + query)
+        return query
+
+    def buildKeyspaceQuery(self, keyspace):
+        """
+        Builds cassandra query for creating keyspace
+        """
+        query = (
+            "CREATE KEYSPACE IF NOT EXISTS "
+            + keyspace
+            + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor'  : 2 };"
+        )
+        return query
index 30f48f7..a66b6ee 100644 (file)
@@ -1,65 +1,65 @@
-# ==================================================================================\r
-#\r
-#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.\r
-#\r
-#   Licensed under the Apache License, Version 2.0 (the "License");\r
-#   you may not use this file except in compliance with the License.\r
-#   You may obtain a copy of the License at\r
-#\r
-#          http://www.apache.org/licenses/LICENSE-2.0\r
-#\r
-#   Unless required by applicable law or agreed to in writing, software\r
-#   distributed under the License is distributed on an "AS IS" BASIS,\r
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-#   See the License for the specific language governing permissions and\r
-#   limitations under the License.\r
-#\r
-# ==================================================================================\r
-\r
-"""\r
-@ModuleName: Sink Default Class\r
-"""\r
-from sink.Base import Sink\r
-class DefaultSparkSink(Sink):\r
-    """\r
-    @Class: Default Spark Sink\r
-    """\r
-    def __init__(self, classflavour):\r
-        """\r
-        @Methond: Constructor\r
-        @Input : classflavor\r
-        """\r
-        self.ClassType="Default"\r
-        self.flavour = classflavour\r
-        self.logger = None\r
-        classconfig = None\r
-        sparkconfig  = None\r
-        self.sparkloadkey = None\r
-        self.writetype = None\r
-    def init(self, sparkhelper, confighelper, inputdict):\r
-        """\r
-        @Methond:init\r
-        @Inputs: sparkhelper\r
-                 confighelper\r
-                 inputdict\r
-        """\r
-        self.logger= confighelper.getLogger()\r
-        classconfig,sparkconfig  = confighelper.getClassConfig(self)\r
-        self.sparkloadkey=classconfig["SparkLoadKey"]\r
-        self.writetype=classconfig["WriteMode"]\r
-        for key in sparkconfig:\r
-            value = sparkconfig[key]\r
-            print("Spark Config",key,sparkconfig[key])\r
-            if value.startswith('$Input$'):\r
-                inputkey = value[7:]\r
-                sparkhelper.addConf(key,inputdict[inputkey])\r
-            else:\r
-                sparkhelper.addConf(key,value)\r
-    def write(self, sparksession, sparkdf):\r
-        """\r
-        @Method: write\r
-        @input : sparksession\r
-                 sparkdf\r
-        """\r
-        sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save()\r
-        self.logger.debug("Data written to Sink"+self.flavour)\r
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""
+@ModuleName: Sink Default Class
+"""
+from sink.Base import Sink
+class DefaultSparkSink(Sink):
+    """
+    @Class: Default Spark Sink
+    """
+    def __init__(self, classflavour):
+        """
+        @Methond: Constructor
+        @Input : classflavor
+        """
+        self.ClassType="Default"
+        self.flavour = classflavour
+        self.logger = None
+        classconfig = None
+        sparkconfig  = None
+        self.sparkloadkey = None
+        self.writetype = None
+    def init(self, sparkhelper, confighelper, inputdict):
+        """
+        @Methond:init
+        @Inputs: sparkhelper
+                 confighelper
+                 inputdict
+        """
+        self.logger= confighelper.getLogger()
+        classconfig,sparkconfig  = confighelper.getClassConfig(self)
+        self.sparkloadkey=classconfig["SparkLoadKey"]
+        self.writetype=classconfig["WriteMode"]
+        for key in sparkconfig:
+            value = sparkconfig[key]
+            print("Spark Config",key,sparkconfig[key])
+            if value.startswith('$Input$'):
+                inputkey = value[7:]
+                sparkhelper.addConf(key,inputdict[inputkey])
+            else:
+                sparkhelper.addConf(key,value)
+    def write(self, sparksession, sparkdf):
+        """
+        @Method: write
+        @input : sparksession
+                 sparkdf
+        """
+        sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save()
+        self.logger.debug("Data written to Sink"+self.flavour)