From: SANDEEP KUMAR JAISAWAL Date: Thu, 13 Oct 2022 05:42:26 +0000 (+0530) Subject: Adding files for sink funtionality for data extraction module X-Git-Tag: 1.0.0~17^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=99964835ae9e405edbc9d7cea5347d99ea5a1c6a;p=aiml-fw%2Fathp%2Fdata-extraction.git Adding files for sink funtionality for data extraction module Issue-Id: AIMLFW-2 Signed-off-by: SANDEEP KUMAR JAISAWAL Change-Id: Ia4b9c1af3878f9b5eae79528610ee5c65d88108f --- diff --git a/src/sink/CassandraSink.py b/src/sink/CassandraSink.py new file mode 100644 index 0000000..e095d67 --- /dev/null +++ b/src/sink/CassandraSink.py @@ -0,0 +1,185 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +@ModuleName: SInk Default Class +""" +from sink.Base import Sink +from pyspark import SparkContext, SparkConf +from pyspark.sql import SQLContext, SparkSession +from pyspark.sql.types import * +from cassandra.cluster import Cluster +from cassandra.auth import PlainTextAuthProvider +import time +import json +from pyspark.sql.functions import monotonically_increasing_id + + +class CassandraSink(Sink): + """ + Writes the queried data in to cassandra table. + It creates the table with given trainingjob name + It creates Keyspaces if doesn't exist. + @Class: Default Spark Sink + """ + + def __init__(self, classflavour): + """ + @Methond: Constructor + @Input : classflavor + """ + self.ClassType = "Custom" + self.flavour = classflavour + self.logger = None + classconfig = None + sparkconfig = None + self.sparkloadkey = None + self.writetype = None + self.tableName = None + self.keyspace = None + self.user = None + self.passw = None + self.host = None + self.port = None + + def init(self, sparkhelper, confighelper, inputdict): + """ + @Methond:init + @Inputs: sparkhelper + confighelper + inputdict + """ + self.logger = confighelper.getLogger() + + self.tableName = inputdict["CollectionName"] + classconfig, sparkconfig = confighelper.getClassConfig(self) + envConf = confighelper.getEnvConfig() + + self.keyspace = envConf["cassandra_sinkdb"] + self.host = envConf["fs_db_ip"] + self.port = envConf["fs_db_port"] + self.user = envConf["fs_db_user"] + self.passw = envConf["fs_db_password"] + + self.sparkloadkey = classconfig["SparkLoadKey"] + self.writetype = classconfig["WriteMode"] + + self.logger.debug( + "Sink keyspace-" + str(self.keyspace) + " table-" + str(self.tableName) + ) + + for key in sparkconfig: + value = sparkconfig[key] + print("Spark Config", key, sparkconfig[key]) + if value.startswith("$Input$"): + inputkey = value[7:] + sparkhelper.addConf(key, inputdict[inputkey]) + else: + sparkhelper.addConf(key, value) + + def write(self, sparksession, sparkdf): + """ + @Method: write the data from sparkdf in to cassandra table + @input : sparksession + sparkdf + """ + self.logger.debug("Data writing to Sink " + self.flavour) + self.create_table(sparkdf) + sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id()) + + + + write_options = { + "table": self.tableName, + "keyspace": self.keyspace, + "spark.cassandra.connection.host": self.host, + "spark.cassandra.connection.port": self.port, + "spark.cassandra.auth.username": self.user, + "spark.cassandra.auth.password": self.passw, + } + + sparkdf.write.format(self.sparkloadkey).options(**write_options).mode( + "append" + ).save() + + self.logger.debug( + "*** Data written to Sink *** " + self.keyspace + "." + self.tableName + ) + + + + def create_table(self, sparkdf): + """ + Recreates table in cassandra db as per trainingjob name + Creates Keyspaces , if doesn't exist + """ + self.logger.debug("Creating table...") + + auth_provider = PlainTextAuthProvider(username=self.user, password=self.passw) + cluster = Cluster([self.host], port=self.port, auth_provider=auth_provider) + + session_default = cluster.connect() + create_ks_query = self.buildKeyspaceQuery(self.keyspace) + session_default.execute(create_ks_query) + + session = cluster.connect(self.keyspace) + + + session.execute(self.buildDeleteTable(sparkdf)) + + query = self.buildCreateTable(sparkdf) + session.execute(query) + time.sleep(3) + cluster.shutdown() + + def buildCreateTable(self, sparkdf): + """ + Builds simple cassandra query for creating table. + Columns names as per sparkdf headers, choosing first header __Id in sparkdf + as primary key for table + """ + query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT," + if sparkdf is not None: + col_list = sparkdf.schema.names + # To maintain the column name case sensitivity + for col in col_list: + query = query + ' "' + str(col) + '"' + " text " + "," + + # Creating __Id column as Primary Key + query = query + "PRIMARY KEY(" + '"__Id"' + "));" + self.logger.debug("Create table query " + query) + return query + + def buildDeleteTable(self, sparkdf): + """ + Builds simple cassandra query for deleting table + """ + query = "DROP TABLE IF EXISTS " + self.tableName + " ;" + self.logger.debug("Delete table query " + query) + return query + + def buildKeyspaceQuery(self, keyspace): + """ + Builds cassandra query for creating keyspace + """ + query = ( + "CREATE KEYSPACE IF NOT EXISTS " + + keyspace + + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };" + ) + return query diff --git a/src/sink/DefaultSparkSink.py b/src/sink/DefaultSparkSink.py new file mode 100644 index 0000000..30f48f7 --- /dev/null +++ b/src/sink/DefaultSparkSink.py @@ -0,0 +1,65 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +@ModuleName: Sink Default Class +""" +from sink.Base import Sink +class DefaultSparkSink(Sink): + """ + @Class: Default Spark Sink + """ + def __init__(self, classflavour): + """ + @Methond: Constructor + @Input : classflavor + """ + self.ClassType="Default" + self.flavour = classflavour + self.logger = None + classconfig = None + sparkconfig = None + self.sparkloadkey = None + self.writetype = None + def init(self, sparkhelper, confighelper, inputdict): + """ + @Methond:init + @Inputs: sparkhelper + confighelper + inputdict + """ + self.logger= confighelper.getLogger() + classconfig,sparkconfig = confighelper.getClassConfig(self) + self.sparkloadkey=classconfig["SparkLoadKey"] + self.writetype=classconfig["WriteMode"] + for key in sparkconfig: + value = sparkconfig[key] + print("Spark Config",key,sparkconfig[key]) + if value.startswith('$Input$'): + inputkey = value[7:] + sparkhelper.addConf(key,inputdict[inputkey]) + else: + sparkhelper.addConf(key,value) + def write(self, sparksession, sparkdf): + """ + @Method: write + @input : sparksession + sparkdf + """ + sparkdf.write.format(self.sparkloadkey).mode(self.writetype).save() + self.logger.debug("Data written to Sink"+self.flavour) diff --git a/src/sink/SinkClassConfig.ini b/src/sink/SinkClassConfig.ini new file mode 100644 index 0000000..7a12e9a --- /dev/null +++ b/src/sink/SinkClassConfig.ini @@ -0,0 +1,36 @@ +[ModuleDetails] +ModuleName=sink +BaseClassName=Sink +DefaultClassName=DefaultSparkSink + +[CassandraSink] +Name=CassandraDB +Description= Spark connector that Supports Writing from CassandraDB +ClassType=Custom +SparkLoadKey=org.apache.spark.sql.cassandra +WriteMode=overwrite + +[CassandraSink.SparkConf] +spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel} +spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 + +[CassandraSource.Inputs] + +[EnvConfig] +FS_DB_USER=FS_DB_USER +FS_DB_PASSWORD=FS_DB_PASSWORD +FS_DB_IP=FS_DB_IP +FS_DB_PORT=FS_DB_PORT +FS_DB_NAME=FS_DB_KEYSPACE_NAME + + +Cassandra_SinkIP=FS_DB_IP +Cassandra_SinkPort=FS_DB_PORT +Cassandra_SinkUser=FS_DB_USER +Cassandra_SinkPassword=FS_DB_PASSWORD +Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL + +Cassandra_SinkDB=FS_DB_KEYSPACE_NAME + + + diff --git a/src/sink/__init__.py b/src/sink/__init__.py new file mode 100644 index 0000000..e69de29