--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+"""
+@Module: Cassandra Spark Source
+"""
+from source.Base import Source
+import json
+from pyspark.sql import SQLContext, SparkSession
+
+class CassandraSource(Source):
+ """
+ @Class: Spark Cassandra Source
+ """
+ def __init__(self,classflavour):
+ """
+ @Method: Single Arg Constructor
+ """
+ self.ClassType="Custom"
+ self.flavour=classflavour
+ self.logger=None
+ self.sparkloadkey=None
+
+ def init(self,sparkhelper,confighelper,inputdict):
+ """
+ @Method:init
+ @Inputs: sparkhelper, confighelper , inputdict
+ @Output: None
+ """
+ self.logger = confighelper.getLogger()
+ self.logger.debug("Set Cassandra Class Spark Source Flavor: " + self.flavour)
+ classconfig,sparkconfig = confighelper.getClassConfig(self)
+ envConf = confighelper.getEnvConfig()
+
+
+ self.sparkloadkey = classconfig["SparkLoadKey"]
+ self.keyspace = envConf['cassandra_sourcedb']
+ self.table = envConf['cassandra_sourcetable']
+
+ self.logger.debug("Source keyspace:" + str(self.keyspace) + " table-" + str(self.table))
+
+
+ for key in sparkconfig:
+ value = sparkconfig[key]
+ if value.startswith('$Input$'):
+ inputkey = value[7:]
+ sparkhelper.addConf(key,inputdict[inputkey])
+ else:
+ sparkhelper.addConf(key,value)
+
+ self.logger.debug("Spark cassandra source initialized as" + self.flavour)
+
+ def load(self,sparksession):
+ """
+ @Method: load
+ @Inputs: sparksession
+ @Output: dataframe
+ Fetches data from cassandra database
+ """
+
+ df_data = sparksession.read.format(self.sparkloadkey).options(table=self.table,keyspace=self.keyspace).load()
+ self.logger.debug("Data Loaded from cassandra Source: " + self.keyspace + "." + self.table)
+ return df_data
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+"""
+@Module: Spark Default Source
+"""
+from source.Base import Source
+
+class DefaultSparkSource(Source):
+ """
+ @Class: Spark Default Source
+ """
+ def __init__(self,classflavour):
+ """
+ @Method: Single Arg Constructor
+ """
+ self.ClassType="Default"
+ self.flavour=classflavour
+ self.logger=None
+ self.sparkloadkey=None
+ def init(self,sparkhelper,confighelper,inputdict):
+ """
+ @Method:Init
+ @Inputs: sparkhelper, confighelper , inputdict
+ @Output: none
+ """
+ self.logger = confighelper.getLogger()
+ classconfig,sparkconfig = confighelper.getClassConfig(self)
+ self.logger.debug("Set Class Spark Source Flavor"+self.flavour)
+ self.sparkloadkey=classconfig["SparkLoadKey"]
+ for key in sparkconfig:
+ value = sparkconfig[key]
+ if value.startswith('$Input$'):
+ inputkey = value[7:]
+ sparkhelper.addConf(key,inputdict[inputkey])
+ else:
+ sparkhelper.addConf(key,value)
+ self.logger.debug("Spark Default Source Initialized as"+self.flavour)
+ def load(self,sparksession):
+ """
+ @Method: load
+ @Inputs: sparksession
+ @Output: dataframe
+ """
+ df_data = sparksession.read.format(self.sparkloadkey).load()
+ self.logger.debug("Data Loaded from Source"+ self.flavour)
+ return df_data
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+"""
+@Module : Influx DB Source
+"""
+from influxdb_client import InfluxDBClient, Point, Dialect
+from influxdb_client.client.write_api import SYNCHRONOUS
+import pandas as pd
+from source.Base import Source
+
+class InfluxSource(Source):
+ """
+ @Class : Influx DB Source
+ """
+ def __init__(self):
+ """
+ @No Args Constructore
+ """
+ super().__init__()
+ self.logger = None
+ self.classconfig = None
+ self.url = None
+ self.token = None
+ self.org = None
+ self.timeout = None
+ self.ssl = None
+ self.query = None
+
+
+ def __init__(self,classflavour):
+ """
+ @Method: Single Arg Constructor
+ """
+ self.ClassType="Custom"
+ self.flavour=classflavour
+ self.logger = None
+ self.classconfig = None
+ self.url = None
+ self.token = None
+ self.org = None
+ self.timeout = None
+ self.ssl = None
+ self.query = None
+
+ def init(self, sparkhelper, confighelper, inputdict):
+ """
+ @Method: init
+ @Input : spark - Spark
+ confighelper - config helper
+ inputdict - input dictionary
+ @Output: None
+ """
+ self.logger = confighelper.getLogger()
+ classconfig = confighelper.getClassConfig(self)
+ self.url = classconfig["url"]
+ self.token = classconfig["token"]
+ self.org = classconfig["org"]
+ self.timeout = classconfig["timeout"]
+ self.ssl = classconfig["ssl"]
+ self.query = classconfig["Query"]
+ if "query" in inputdict.keys():
+ self.query = inputdict["query"]
+ self.logger.debug("Set variable query"+ self.query)
+ else:
+ self.logger.error("Expected variable query"+ self.query)
+ def load(self,sparksession):
+ """
+ @Method: init
+ @Input : spark - Spark
+ @Output: spark dataframe
+ """
+ client = InfluxDBClient(url=self.url,\
+ token = self.token,org=self.org,\
+ timeout=self.timeout,\
+ verify_ssl=self.ssl)
+ query_api = client.query_api()
+ self.logger.debug("Started Data Extraction for Influx Source "+self.query)
+ df_data = query_api.query_data_frame(self.query)
+ spark_df = sparksession.createDataFrame(df_data)
+ client.close()
+ self.logger.debug("Load Data Completed for Influx Source")
+ return spark_df
--- /dev/null
+[InfluxSource]
+Name=Influxdb
+Description= Spark connector that Supports Reading from InfluxDB
+ClassType=Custom
+url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort}
+token = $ENV{Influx_Token}
+org = $ENV{Influx_DBHeirarchyL1}
+timeout = 100000
+ssl = False
+Query = $Input$query
+
+[InfluxSource.Inputs]
+query=this query for influx db
+
+[CassandraSource]
+Name=Cassandradb
+Description= Spark connector that Supports Reading from CassandraDB
+ClassType=Custom
+SparkLoadKey=org.apache.spark.sql.cassandra
+
+[CassandraSource.SparkConf]
+spark.cassandra.connection.host=$ENV{Cassandra_SourceIP}
+spark.cassandra.connection.port=$ENV{Cassandra_SourcePort}
+spark.cassandra.auth.username=$ENV{Cassandra_SourceUser}
+spark.cassandra.auth.password=$ENV{Cassandra_SourcePassword}
+spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel}
+spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1
+
+[CassandraSource.Inputs]
+
+[ModuleDetails]
+ModuleName=source
+BaseClassName=Source
+DefaultClassName=DefaultSparkSource
+
+[EnvConfig]
+Influx_SourceIP=Influx_DATALAKE_IP
+Influx_SourcePort=Influx_DATALAKE_PORT
+Influx_MonSourcePort=Influx_DATALAKE_PORT
+Influx_DBHeirarchyL1=Influx_ORG_NAME
+Influx_DBHeirarchyL2=Influx_BUCKET_NAME
+Influx_Token=Influx_Token
+
+Cassandra_SourceIP=Cassandra_DATALAKE_IP
+Cassandra_SourcePort=Cassandra_DATALAKE_PORT
+Cassandra_SourceUser=Cassandra_DATALAKE_USER
+Cassandra_SourcePassword=Cassandra_DATALAKE_PASSWORD
+Cassandra_SourceDB=Cassandra_KEYSPACE_NAME
+Cassandra_SourceTable=Cassandra_TABLE_NAME
+Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL
\ No newline at end of file