From: josephthaliath Date: Fri, 14 Oct 2022 10:45:05 +0000 (+0530) Subject: Adding files for source funtionality for data extraction module X-Git-Tag: 1.0.0~15 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=57da27b8609649938369e59bc7f3533662cd58fa;p=aiml-fw%2Fathp%2Fdata-extraction.git Adding files for source funtionality for data extraction module Issue-Id: AIMLFW-2 Signed-off-by: josephthaliath Change-Id: I7d9b2ed7dc16bd1af5c4f898772fd5b8636fb6d4 --- diff --git a/src/source/CassandraSource.py b/src/source/CassandraSource.py new file mode 100644 index 0000000..7911011 --- /dev/null +++ b/src/source/CassandraSource.py @@ -0,0 +1,78 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +@Module: Cassandra Spark Source +""" +from source.Base import Source +import json +from pyspark.sql import SQLContext, SparkSession + +class CassandraSource(Source): + """ + @Class: Spark Cassandra Source + """ + def __init__(self,classflavour): + """ + @Method: Single Arg Constructor + """ + self.ClassType="Custom" + self.flavour=classflavour + self.logger=None + self.sparkloadkey=None + + def init(self,sparkhelper,confighelper,inputdict): + """ + @Method:init + @Inputs: sparkhelper, confighelper , inputdict + @Output: None + """ + self.logger = confighelper.getLogger() + self.logger.debug("Set Cassandra Class Spark Source Flavor: " + self.flavour) + classconfig,sparkconfig = confighelper.getClassConfig(self) + envConf = confighelper.getEnvConfig() + + + self.sparkloadkey = classconfig["SparkLoadKey"] + self.keyspace = envConf['cassandra_sourcedb'] + self.table = envConf['cassandra_sourcetable'] + + self.logger.debug("Source keyspace:" + str(self.keyspace) + " table-" + str(self.table)) + + + for key in sparkconfig: + value = sparkconfig[key] + if value.startswith('$Input$'): + inputkey = value[7:] + sparkhelper.addConf(key,inputdict[inputkey]) + else: + sparkhelper.addConf(key,value) + + self.logger.debug("Spark cassandra source initialized as" + self.flavour) + + def load(self,sparksession): + """ + @Method: load + @Inputs: sparksession + @Output: dataframe + Fetches data from cassandra database + """ + + df_data = sparksession.read.format(self.sparkloadkey).options(table=self.table,keyspace=self.keyspace).load() + self.logger.debug("Data Loaded from cassandra Source: " + self.keyspace + "." + self.table) + return df_data diff --git a/src/source/DefaultSparkSource.py b/src/source/DefaultSparkSource.py new file mode 100644 index 0000000..722942a --- /dev/null +++ b/src/source/DefaultSparkSource.py @@ -0,0 +1,62 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +@Module: Spark Default Source +""" +from source.Base import Source + +class DefaultSparkSource(Source): + """ + @Class: Spark Default Source + """ + def __init__(self,classflavour): + """ + @Method: Single Arg Constructor + """ + self.ClassType="Default" + self.flavour=classflavour + self.logger=None + self.sparkloadkey=None + def init(self,sparkhelper,confighelper,inputdict): + """ + @Method:Init + @Inputs: sparkhelper, confighelper , inputdict + @Output: none + """ + self.logger = confighelper.getLogger() + classconfig,sparkconfig = confighelper.getClassConfig(self) + self.logger.debug("Set Class Spark Source Flavor"+self.flavour) + self.sparkloadkey=classconfig["SparkLoadKey"] + for key in sparkconfig: + value = sparkconfig[key] + if value.startswith('$Input$'): + inputkey = value[7:] + sparkhelper.addConf(key,inputdict[inputkey]) + else: + sparkhelper.addConf(key,value) + self.logger.debug("Spark Default Source Initialized as"+self.flavour) + def load(self,sparksession): + """ + @Method: load + @Inputs: sparksession + @Output: dataframe + """ + df_data = sparksession.read.format(self.sparkloadkey).load() + self.logger.debug("Data Loaded from Source"+ self.flavour) + return df_data diff --git a/src/source/InfluxSource.py b/src/source/InfluxSource.py new file mode 100644 index 0000000..87c8965 --- /dev/null +++ b/src/source/InfluxSource.py @@ -0,0 +1,98 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +""" +@Module : Influx DB Source +""" +from influxdb_client import InfluxDBClient, Point, Dialect +from influxdb_client.client.write_api import SYNCHRONOUS +import pandas as pd +from source.Base import Source + +class InfluxSource(Source): + """ + @Class : Influx DB Source + """ + def __init__(self): + """ + @No Args Constructore + """ + super().__init__() + self.logger = None + self.classconfig = None + self.url = None + self.token = None + self.org = None + self.timeout = None + self.ssl = None + self.query = None + + + def __init__(self,classflavour): + """ + @Method: Single Arg Constructor + """ + self.ClassType="Custom" + self.flavour=classflavour + self.logger = None + self.classconfig = None + self.url = None + self.token = None + self.org = None + self.timeout = None + self.ssl = None + self.query = None + + def init(self, sparkhelper, confighelper, inputdict): + """ + @Method: init + @Input : spark - Spark + confighelper - config helper + inputdict - input dictionary + @Output: None + """ + self.logger = confighelper.getLogger() + classconfig = confighelper.getClassConfig(self) + self.url = classconfig["url"] + self.token = classconfig["token"] + self.org = classconfig["org"] + self.timeout = classconfig["timeout"] + self.ssl = classconfig["ssl"] + self.query = classconfig["Query"] + if "query" in inputdict.keys(): + self.query = inputdict["query"] + self.logger.debug("Set variable query"+ self.query) + else: + self.logger.error("Expected variable query"+ self.query) + def load(self,sparksession): + """ + @Method: init + @Input : spark - Spark + @Output: spark dataframe + """ + client = InfluxDBClient(url=self.url,\ + token = self.token,org=self.org,\ + timeout=self.timeout,\ + verify_ssl=self.ssl) + query_api = client.query_api() + self.logger.debug("Started Data Extraction for Influx Source "+self.query) + df_data = query_api.query_data_frame(self.query) + spark_df = sparksession.createDataFrame(df_data) + client.close() + self.logger.debug("Load Data Completed for Influx Source") + return spark_df diff --git a/src/source/SourceClassConfig.ini b/src/source/SourceClassConfig.ini new file mode 100644 index 0000000..5e44f6a --- /dev/null +++ b/src/source/SourceClassConfig.ini @@ -0,0 +1,50 @@ +[InfluxSource] +Name=Influxdb +Description= Spark connector that Supports Reading from InfluxDB +ClassType=Custom +url = http://$ENV{Influx_SourceIP}:$ENV{Influx_SourcePort} +token = $ENV{Influx_Token} +org = $ENV{Influx_DBHeirarchyL1} +timeout = 100000 +ssl = False +Query = $Input$query + +[InfluxSource.Inputs] +query=this query for influx db + +[CassandraSource] +Name=Cassandradb +Description= Spark connector that Supports Reading from CassandraDB +ClassType=Custom +SparkLoadKey=org.apache.spark.sql.cassandra + +[CassandraSource.SparkConf] +spark.cassandra.connection.host=$ENV{Cassandra_SourceIP} +spark.cassandra.connection.port=$ENV{Cassandra_SourcePort} +spark.cassandra.auth.username=$ENV{Cassandra_SourceUser} +spark.cassandra.auth.password=$ENV{Cassandra_SourcePassword} +spark.cassandra.output.consistency.level=$ENV{Cassandra_CosistencyLevel} +spark.jars.packages=com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 + +[CassandraSource.Inputs] + +[ModuleDetails] +ModuleName=source +BaseClassName=Source +DefaultClassName=DefaultSparkSource + +[EnvConfig] +Influx_SourceIP=Influx_DATALAKE_IP +Influx_SourcePort=Influx_DATALAKE_PORT +Influx_MonSourcePort=Influx_DATALAKE_PORT +Influx_DBHeirarchyL1=Influx_ORG_NAME +Influx_DBHeirarchyL2=Influx_BUCKET_NAME +Influx_Token=Influx_Token + +Cassandra_SourceIP=Cassandra_DATALAKE_IP +Cassandra_SourcePort=Cassandra_DATALAKE_PORT +Cassandra_SourceUser=Cassandra_DATALAKE_USER +Cassandra_SourcePassword=Cassandra_DATALAKE_PASSWORD +Cassandra_SourceDB=Cassandra_KEYSPACE_NAME +Cassandra_SourceTable=Cassandra_TABLE_NAME +Cassandra_CosistencyLevel=Cassandra_CONS_LEVEL \ No newline at end of file diff --git a/src/source/__init__.py b/src/source/__init__.py new file mode 100644 index 0000000..e69de29