Add a Cassandra clustering key for data extraction 97/13297/8
authorSungjin Lee <sodyn99@gmail.com>
Tue, 1 Oct 2024 13:30:29 +0000 (13:30 +0000)
committerSungjin Lee <sodyn99@gmail.com>
Thu, 3 Oct 2024 09:45:13 +0000 (09:45 +0000)
- Add a partition key
- Use Cassandra key settings to enable data sorting and facilitate proper time-series model training
- Add _Id to the clustering key
- Unify the key column name
- Add lit for test_cassandrasink.py

Issue-ID: AIMLFW-148

Change-Id: Ia624d331fc625e126075a3766132bfc07bf1297a
Signed-off-by: Sungjin Lee <sodyn99@gmail.com>
Dockerfile
dataextraction/sink/CassandraSink.py
requirements.txt
test/test_cassandrasink.py

index ffba2aa..64f1aa8 100644 (file)
@@ -25,17 +25,17 @@ RUN apt-get update && apt-get install -y \
     python3 && apt-get install -y \
     python3-pip
 
-# Install OpenJDK-8 for Spark
+# Install OpenJDK-11 for Spark
 RUN apt-get update && \
-    apt-get install openjdk-8-jre openjdk-8-jdk -y && \
+    apt-get install openjdk-11-jre openjdk-11-jdk -y && \
     apt-get clean;
-       
+
 RUN apt-get update && \
     apt-get install scala -y && \
     apt-get clean;
-    
+
 # Setup JAVA_HOME -- useful for docker commandline
-ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
+ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64/
 RUN export JAVA_HOME
 
 WORKDIR ${TA_DIR}
index 1393eed..e8aab1f 100644 (file)
@@ -27,7 +27,7 @@ from cassandra.cluster import Cluster
 from cassandra.auth import PlainTextAuthProvider
 import time
 import json
-from pyspark.sql.functions import monotonically_increasing_id 
+from pyspark.sql.functions import lit, monotonically_increasing_id
 
 
 class CassandraSink(Sink):
@@ -98,10 +98,14 @@ class CassandraSink(Sink):
         """
         self.logger.debug("Data writing to Sink " + self.flavour)
         self.create_table(sparkdf)
-        sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id())
-        
-        
-        
+
+        # Set '1' to _partition_key to store all data in the same partition
+        sparkdf = sparkdf.select("*") \
+                .withColumn("_partition_key", lit('1')) \
+                .withColumn("_Id", monotonically_increasing_id())
+
+
+
         write_options = {
             "table": self.tableName,
             "keyspace": self.keyspace,
@@ -118,7 +122,7 @@ class CassandraSink(Sink):
         self.logger.debug(
             "*** Data written to Sink *** " + self.keyspace + "." + self.tableName
         )
-        
+
 
 
     def create_table(self, sparkdf):
@@ -136,10 +140,10 @@ class CassandraSink(Sink):
         session_default.execute(create_ks_query)
 
         session = cluster.connect(self.keyspace)
-        
-        
+
+
         session.execute(self.buildDeleteTable())
-        
+
         query = self.buildCreateTable(sparkdf)
         session.execute(query)
         time.sleep(3)
@@ -148,18 +152,19 @@ class CassandraSink(Sink):
     def buildCreateTable(self, sparkdf):
         """
         Builds simple cassandra query for creating table.
-        Columns names as per sparkdf headers, choosing first header __Id in sparkdf
-        as primary key for table
+        Columns names as per sparkdf headers,
+        choosing _partition_key in sparkdf as partition key,
+        _Id as clustering key for table
         """
-        query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT,"
+        query = "CREATE TABLE " + self.tableName + ' ( "_partition_key" text, "_Id" bigint, '
         if sparkdf is not None:
             col_list = sparkdf.schema.names
             # To maintain the column name case sensitivity
             for col in col_list:
-                query = query + ' "' + str(col) + '"' + " text " + ","
+                query = query + ' "' + str(col) + '" text ' + ","
 
-        # Creating __Id column as Primary Key
-        query = query + "PRIMARY KEY(" + '"__Id"' + "));"
+        # Creating _partition_key column as Partition Key, _Id as Clustering Key
+        query = query + 'PRIMARY KEY (("_partition_key"), "_Id"));'
         self.logger.debug("Create table query " + query)
         return query
 
index 06461e1..3f41422 100644 (file)
@@ -1,7 +1,7 @@
-pyspark==3.1.2
+pyspark==3.5.3
 cassandra-driver==3.25.0
 json5==0.9.6
-influxdb-client==1.20.0
+influxdb-client==1.46.0
 pandas==2.2.2
 importlib-metadata==4.8.1
 lru-dict==1.1.7
index 17e6cad..5aee4e4 100644 (file)
@@ -23,8 +23,8 @@ from mock import patch
 
 # In order to import dataextraction functions
 sys.path.extend(["dataextraction/"])
-os.environ['CODE_DIR_PATH']='test'   
-load_dotenv('test/test_env.env') # Loading Env Variables 
+os.environ['CODE_DIR_PATH']='test'
+load_dotenv('test/test_env.env') # Loading Env Variables
 from sink.CassandraSink import CassandraSink
 from ConfigHelper import ConfigHelper
 
@@ -44,19 +44,19 @@ class df_helper():
 
     def select(self, query):
         return self
-    
+
     def withColumn(self, col_name, order):
         return self
-    
+
     def format(self, key):
         return self
-    
+
     def options(self, **kwargs):
         return self
-    
+
     def mode(self, mode):
         return self
-    
+
     def save(self):
         self.saved = True
 
@@ -70,11 +70,12 @@ class Test_CassandraSink():
 
     def test_init(self):
         assert self.obj.logger != None, "Cassandra Sink Object Creation Failed"
-    
+
     @patch('sink.CassandraSink.Cluster')
     @patch('sink.CassandraSink.Cluster.connect')
+    @patch('sink.CassandraSink.lit')
     @patch('sink.CassandraSink.monotonically_increasing_id')
-    def test_write(self, mock1, mock2, mock3):
+    def test_write(self, mock1, mock2, mock3, mock4):
         df_helper_obj = df_helper()
         self.obj.write(sparkSessionHelper(),df_helper_obj)
         assert df_helper_obj.saved, "Data Failed to save"
\ No newline at end of file