From: Sungjin Lee Date: Tue, 1 Oct 2024 13:30:29 +0000 (+0000) Subject: Add a Cassandra clustering key for data extraction X-Git-Tag: k-release~4^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=16079abbc74f1badffb307c5fe7dea9d13ff4714;p=aiml-fw%2Fathp%2Fdata-extraction.git Add a Cassandra clustering key for data extraction - Add a partition key - Use Cassandra key settings to enable data sorting and facilitate proper time-series model training - Add _Id to the clustering key - Unify the key column name - Add lit for test_cassandrasink.py Issue-ID: AIMLFW-148 Change-Id: Ia624d331fc625e126075a3766132bfc07bf1297a Signed-off-by: Sungjin Lee --- diff --git a/Dockerfile b/Dockerfile index ffba2aa..64f1aa8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,17 +25,17 @@ RUN apt-get update && apt-get install -y \ python3 && apt-get install -y \ python3-pip -# Install OpenJDK-8 for Spark +# Install OpenJDK-11 for Spark RUN apt-get update && \ - apt-get install openjdk-8-jre openjdk-8-jdk -y && \ + apt-get install openjdk-11-jre openjdk-11-jdk -y && \ apt-get clean; - + RUN apt-get update && \ apt-get install scala -y && \ apt-get clean; - + # Setup JAVA_HOME -- useful for docker commandline -ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/ +ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64/ RUN export JAVA_HOME WORKDIR ${TA_DIR} diff --git a/dataextraction/sink/CassandraSink.py b/dataextraction/sink/CassandraSink.py index 1393eed..e8aab1f 100644 --- a/dataextraction/sink/CassandraSink.py +++ b/dataextraction/sink/CassandraSink.py @@ -27,7 +27,7 @@ from cassandra.cluster import Cluster from cassandra.auth import PlainTextAuthProvider import time import json -from pyspark.sql.functions import monotonically_increasing_id +from pyspark.sql.functions import lit, monotonically_increasing_id class CassandraSink(Sink): @@ -98,10 +98,14 @@ class CassandraSink(Sink): """ self.logger.debug("Data writing to Sink " + self.flavour) self.create_table(sparkdf) - sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id()) - - - + + # Set '1' to _partition_key to store all data in the same partition + sparkdf = sparkdf.select("*") \ + .withColumn("_partition_key", lit('1')) \ + .withColumn("_Id", monotonically_increasing_id()) + + + write_options = { "table": self.tableName, "keyspace": self.keyspace, @@ -118,7 +122,7 @@ class CassandraSink(Sink): self.logger.debug( "*** Data written to Sink *** " + self.keyspace + "." + self.tableName ) - + def create_table(self, sparkdf): @@ -136,10 +140,10 @@ class CassandraSink(Sink): session_default.execute(create_ks_query) session = cluster.connect(self.keyspace) - - + + session.execute(self.buildDeleteTable()) - + query = self.buildCreateTable(sparkdf) session.execute(query) time.sleep(3) @@ -148,18 +152,19 @@ class CassandraSink(Sink): def buildCreateTable(self, sparkdf): """ Builds simple cassandra query for creating table. - Columns names as per sparkdf headers, choosing first header __Id in sparkdf - as primary key for table + Columns names as per sparkdf headers, + choosing _partition_key in sparkdf as partition key, + _Id as clustering key for table """ - query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT," + query = "CREATE TABLE " + self.tableName + ' ( "_partition_key" text, "_Id" bigint, ' if sparkdf is not None: col_list = sparkdf.schema.names # To maintain the column name case sensitivity for col in col_list: - query = query + ' "' + str(col) + '"' + " text " + "," + query = query + ' "' + str(col) + '" text ' + "," - # Creating __Id column as Primary Key - query = query + "PRIMARY KEY(" + '"__Id"' + "));" + # Creating _partition_key column as Partition Key, _Id as Clustering Key + query = query + 'PRIMARY KEY (("_partition_key"), "_Id"));' self.logger.debug("Create table query " + query) return query diff --git a/requirements.txt b/requirements.txt index 06461e1..3f41422 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ -pyspark==3.1.2 +pyspark==3.5.3 cassandra-driver==3.25.0 json5==0.9.6 -influxdb-client==1.20.0 +influxdb-client==1.46.0 pandas==2.2.2 importlib-metadata==4.8.1 lru-dict==1.1.7 diff --git a/test/test_cassandrasink.py b/test/test_cassandrasink.py index 17e6cad..5aee4e4 100644 --- a/test/test_cassandrasink.py +++ b/test/test_cassandrasink.py @@ -23,8 +23,8 @@ from mock import patch # In order to import dataextraction functions sys.path.extend(["dataextraction/"]) -os.environ['CODE_DIR_PATH']='test' -load_dotenv('test/test_env.env') # Loading Env Variables +os.environ['CODE_DIR_PATH']='test' +load_dotenv('test/test_env.env') # Loading Env Variables from sink.CassandraSink import CassandraSink from ConfigHelper import ConfigHelper @@ -44,19 +44,19 @@ class df_helper(): def select(self, query): return self - + def withColumn(self, col_name, order): return self - + def format(self, key): return self - + def options(self, **kwargs): return self - + def mode(self, mode): return self - + def save(self): self.saved = True @@ -70,11 +70,12 @@ class Test_CassandraSink(): def test_init(self): assert self.obj.logger != None, "Cassandra Sink Object Creation Failed" - + @patch('sink.CassandraSink.Cluster') @patch('sink.CassandraSink.Cluster.connect') + @patch('sink.CassandraSink.lit') @patch('sink.CassandraSink.monotonically_increasing_id') - def test_write(self, mock1, mock2, mock3): + def test_write(self, mock1, mock2, mock3, mock4): df_helper_obj = df_helper() self.obj.write(sparkSessionHelper(),df_helper_obj) assert df_helper_obj.saved, "Data Failed to save" \ No newline at end of file