python3 && apt-get install -y \
python3-pip
-# Install OpenJDK-8 for Spark
+# Install OpenJDK-11 for Spark
RUN apt-get update && \
- apt-get install openjdk-8-jre openjdk-8-jdk -y && \
+ apt-get install openjdk-11-jre openjdk-11-jdk -y && \
apt-get clean;
-
+
RUN apt-get update && \
apt-get install scala -y && \
apt-get clean;
-
+
# Setup JAVA_HOME -- useful for docker commandline
-ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
+ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64/
RUN export JAVA_HOME
WORKDIR ${TA_DIR}
from cassandra.auth import PlainTextAuthProvider
import time
import json
-from pyspark.sql.functions import monotonically_increasing_id
+from pyspark.sql.functions import lit, monotonically_increasing_id
class CassandraSink(Sink):
"""
self.logger.debug("Data writing to Sink " + self.flavour)
self.create_table(sparkdf)
- sparkdf = sparkdf.select("*").withColumn("__Id", monotonically_increasing_id())
-
-
-
+
+ # Set '1' to _partition_key to store all data in the same partition
+ sparkdf = sparkdf.select("*") \
+ .withColumn("_partition_key", lit('1')) \
+ .withColumn("_Id", monotonically_increasing_id())
+
+
+
write_options = {
"table": self.tableName,
"keyspace": self.keyspace,
self.logger.debug(
"*** Data written to Sink *** " + self.keyspace + "." + self.tableName
)
-
+
def create_table(self, sparkdf):
session_default.execute(create_ks_query)
session = cluster.connect(self.keyspace)
-
-
+
+
session.execute(self.buildDeleteTable())
-
+
query = self.buildCreateTable(sparkdf)
session.execute(query)
time.sleep(3)
def buildCreateTable(self, sparkdf):
"""
Builds simple cassandra query for creating table.
- Columns names as per sparkdf headers, choosing first header __Id in sparkdf
- as primary key for table
+ Columns names as per sparkdf headers,
+ choosing _partition_key in sparkdf as partition key,
+ _Id as clustering key for table
"""
- query = "CREATE TABLE " + self.tableName + " ( \"__Id\" BIGINT,"
+ query = "CREATE TABLE " + self.tableName + ' ( "_partition_key" text, "_Id" bigint, '
if sparkdf is not None:
col_list = sparkdf.schema.names
# To maintain the column name case sensitivity
for col in col_list:
- query = query + ' "' + str(col) + '"' + " text " + ","
+ query = query + ' "' + str(col) + '" text ' + ","
- # Creating __Id column as Primary Key
- query = query + "PRIMARY KEY(" + '"__Id"' + "));"
+ # Creating _partition_key column as Partition Key, _Id as Clustering Key
+ query = query + 'PRIMARY KEY (("_partition_key"), "_Id"));'
self.logger.debug("Create table query " + query)
return query
# In order to import dataextraction functions
sys.path.extend(["dataextraction/"])
-os.environ['CODE_DIR_PATH']='test'
-load_dotenv('test/test_env.env') # Loading Env Variables
+os.environ['CODE_DIR_PATH']='test'
+load_dotenv('test/test_env.env') # Loading Env Variables
from sink.CassandraSink import CassandraSink
from ConfigHelper import ConfigHelper
def select(self, query):
return self
-
+
def withColumn(self, col_name, order):
return self
-
+
def format(self, key):
return self
-
+
def options(self, **kwargs):
return self
-
+
def mode(self, mode):
return self
-
+
def save(self):
self.saved = True
def test_init(self):
assert self.obj.logger != None, "Cassandra Sink Object Creation Failed"
-
+
@patch('sink.CassandraSink.Cluster')
@patch('sink.CassandraSink.Cluster.connect')
+ @patch('sink.CassandraSink.lit')
@patch('sink.CassandraSink.monotonically_increasing_id')
- def test_write(self, mock1, mock2, mock3):
+ def test_write(self, mock1, mock2, mock3, mock4):
df_helper_obj = df_helper()
self.obj.write(sparkSessionHelper(),df_helper_obj)
assert df_helper_obj.saved, "Data Failed to save"
\ No newline at end of file