Adding files for source funtionality for data extraction module 63/9263/1
authorjosephthaliath <jo.thaliath@samsung.com>
Fri, 14 Oct 2022 10:45:05 +0000 (16:15 +0530)
committerjosephthaliath <jo.thaliath@samsung.com>
Fri, 14 Oct 2022 10:46:29 +0000 (16:16 +0530)
Issue-Id: AIMLFW-2

Signed-off-by: josephthaliath <jo.thaliath@samsung.com>
Change-Id: I7d9b2ed7dc16bd1af5c4f898772fd5b8636fb6d4

src/source/CassandraSource.py [new file with mode: 0644]
src/source/DefaultSparkSource.py [new file with mode: 0644]
src/source/InfluxSource.py [new file with mode: 0644]
src/source/SourceClassConfig.ini [new file with mode: 0644]
src/source/__init__.py [new file with mode: 0644]

diff --git a/src/source/CassandraSource.py b/src/source/CassandraSource.py
new file mode 100644 (file)
index 0000000..7911011
--- /dev/null
@@ -0,0 +1,78 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""
+@Module: Cassandra Spark Source
+"""
+from source.Base import Source
+import json
+from pyspark.sql import SQLContext, SparkSession
+
+class CassandraSource(Source):
+    """
+    @Class: Spark Cassandra Source
+    """
+    def __init__(self,classflavour):
+        """
+        @Method: Single Arg Constructor
+        """
+        self.ClassType="Custom"
+        self.flavour=classflavour
+        self.logger=None
+        self.sparkloadkey=None
+        
+    def init(self,sparkhelper,confighelper,inputdict):
+        """
+        @Method:init
+        @Inputs: sparkhelper, confighelper , inputdict
+        @Output: None
+        """
+        self.logger = confighelper.getLogger()
+        self.logger.debug("Set Cassandra Class Spark Source Flavor: " + self.flavour)
+        classconfig,sparkconfig  = confighelper.getClassConfig(self)
+        envConf = confighelper.getEnvConfig()
+        
+        
+        self.sparkloadkey = classconfig["SparkLoadKey"]
+        self.keyspace  = envConf['cassandra_sourcedb']
+        self.table = envConf['cassandra_sourcetable']
+        
+        self.logger.debug("Source keyspace:" +  str(self.keyspace) + " table-" + str(self.table))
+        
+
+        for key in sparkconfig:
+            value = sparkconfig[key]
+            if value.startswith('$Input$'):
+                inputkey = value[7:]
+                sparkhelper.addConf(key,inputdict[inputkey])
+            else:
+                sparkhelper.addConf(key,value)
+            
+        self.logger.debug("Spark cassandra source initialized as" + self.flavour)
+
+    def load(self,sparksession):
+        """
+        @Method: load 
+        @Inputs: sparksession
+        @Output: dataframe
+            Fetches data from cassandra database
+        """
+        
+        df_data = sparksession.read.format(self.sparkloadkey).options(table=self.table,keyspace=self.keyspace).load()
+        self.logger.debug("Data Loaded from cassandra Source: " + self.keyspace + "." + self.table)
+        return df_data
diff --git a/src/source/DefaultSparkSource.py b/src/source/DefaultSparkSource.py
new file mode 100644 (file)
index 0000000..722942a
--- /dev/null
@@ -0,0 +1,62 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""
+@Module: Spark Default Source
+"""
+from source.Base import Source
+
+class DefaultSparkSource(Source):
+    """
+    @Class: Spark Default Source
+    """
+    def __init__(self,classflavour):
+        """
+        @Method: Single Arg Constructor
+        """
+        self.ClassType="Default"
+        self.flavour=classflavour
+        self.logger=None
+        self.sparkloadkey=None
+    def init(self,sparkhelper,confighelper,inputdict):
+        """
+        @Method:Init
+        @Inputs: sparkhelper, confighelper , inputdict
+        @Output: none
+        """
+        self.logger = confighelper.getLogger()
+        classconfig,sparkconfig  = confighelper.getClassConfig(self)
+        self.logger.debug("Set Class Spark Source Flavor"+self.flavour)
+        self.sparkloadkey=classconfig["SparkLoadKey"]
+        for key in sparkconfig:
+            value = sparkconfig[key]
+            if value.startswith('$Input$'):
+                inputkey = value[7:]
+                sparkhelper.addConf(key,inputdict[inputkey])
+            else:
+                sparkhelper.addConf(key,value)
+        self.logger.debug("Spark Default Source Initialized as"+self.flavour)
+    def load(self,sparksession):
+        """
+        @Method: load
+        @Inputs: sparksession
+        @Output: dataframe
+        """
+        df_data = sparksession.read.format(self.sparkloadkey).load()
+        self.logger.debug("Data Loaded from Source"+ self.flavour)
+        return df_data
diff --git a/src/source/InfluxSource.py b/src/source/InfluxSource.py
new file mode 100644 (file)
index 0000000..87c8965
--- /dev/null
@@ -0,0 +1,98 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""
+@Module : Influx DB Source
+"""
+from influxdb_client import InfluxDBClient, Point, Dialect
+from influxdb_client.client.write_api import SYNCHRONOUS
+import pandas as pd
+from source.Base import Source
+
+class InfluxSource(Source):
+    """
+    @Class : Influx DB Source
+    """
+    def __init__(self):
+        """
+        @No Args Constructore
+        """
+        super().__init__()
+        self.logger  = None
+        self.classconfig  = None
+        self.url     = None
+        self.token   = None
+        self.org     = None
+        self.timeout = None
+        self.ssl     = None
+        self.query = None
+        
+        
+    def __init__(self,classflavour):
+        """
+        @Method: Single Arg Constructor
+        """
+        self.ClassType="Custom"
+        self.flavour=classflavour
+        self.logger  = None
+        self.classconfig  = None
+        self.url     = None
+        self.token   = None
+        self.org     = None
+        self.timeout = None
+        self.ssl     = None
+        self.query = None
+    
+    def init(self, sparkhelper, confighelper, inputdict):
+        """
+        @Method: init
+        @Input : spark - Spark
+                 confighelper - config helper
+                 inputdict - input dictionary
+        @Output: None
+        """
+        self.logger  = confighelper.getLogger()
+        classconfig  = confighelper.getClassConfig(self)
+        self.url     = classconfig["url"]
+        self.token   = classconfig["token"]
+        self.org     = classconfig["org"]
+        self.timeout = classconfig["timeout"]
+        self.ssl     = classconfig["ssl"]
+        self.query   = classconfig["Query"]
+        if "query" in inputdict.keys():
+            self.query = inputdict["query"]
+            self.logger.debug("Set variable query"+ self.query)
+        else:
+            self.logger.error("Expected variable query"+ self.query)
+    def load(self,sparksession):
+        """
+        @Method: init
+        @Input : spark - Spark
+        @Output: spark dataframe
+        """
+        client = InfluxDBClient(url=self.url,\
+                                token = self.token,org=self.org,\
+                                timeout=self.timeout,\
+                                verify_ssl=self.ssl)
+        query_api = client.query_api()
+        self.logger.debug("Started Data Extraction for Influx Source "+self.query)
+        df_data = query_api.query_data_frame(self.query)
+        spark_df = sparksession.createDataFrame(df_data)
+        client.close()
+        self.logger.debug("Load Data Completed for Influx Source")
+        return spark_df
diff --git a/src/source/SourceClassConfig.ini b/src/source/SourceClassConfig.ini
new file mode 100644 (file)
index 0000000..5e44f6a
--- /dev/null
@@ -0,0 +1,50 @@
+[InfluxSource]
+Name=Influxdb
+Description= Spark connector that Supports Reading from InfluxDB
+ClassType=Custom
+url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort}
+token = $ENV{Influx_Token}
+org = $ENV{Influx_DBHeirarchyL1}
+timeout = 100000
+ssl = False
+Query = $Input$query
+
+[InfluxSource.Inputs]
+query=this query for influx db
+
+[CassandraSource]
+Name=Cassandradb
+Description= Spark connector that Supports Reading from CassandraDB
+ClassType=Custom
+SparkLoadKey=org.apache.spark.sql.cassandra
+
+[CassandraSource.SparkConf]
+spark.cassandra.connection.host=$ENV{Cassandra_SourceIP}
+spark.cassandra.connection.port=$ENV{Cassandra_SourcePort}
+spark.cassandra.auth.username=$ENV{Cassandra_SourceUser}
+spark.cassandra.auth.password=$ENV{Cassandra_SourcePassword}
+spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel}
+spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1
+
+[CassandraSource.Inputs]
+
+[ModuleDetails]
+ModuleName=source
+BaseClassName=Source
+DefaultClassName=DefaultSparkSource
+
+[EnvConfig]
+Influx_SourceIP=Influx_DATALAKE_IP
+Influx_SourcePort=Influx_DATALAKE_PORT
+Influx_MonSourcePort=Influx_DATALAKE_PORT
+Influx_DBHeirarchyL1=Influx_ORG_NAME
+Influx_DBHeirarchyL2=Influx_BUCKET_NAME
+Influx_Token=Influx_Token
+
+Cassandra_SourceIP=Cassandra_DATALAKE_IP
+Cassandra_SourcePort=Cassandra_DATALAKE_PORT
+Cassandra_SourceUser=Cassandra_DATALAKE_USER
+Cassandra_SourcePassword=Cassandra_DATALAKE_PASSWORD
+Cassandra_SourceDB=Cassandra_KEYSPACE_NAME
+Cassandra_SourceTable=Cassandra_TABLE_NAME
+Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL
\ No newline at end of file
diff --git a/src/source/__init__.py b/src/source/__init__.py
new file mode 100644 (file)
index 0000000..e69de29