Optimización Joins II
Data Spill to Disk¶
spark.stop()
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Join Optimization Pt2")
.master("local[*]")
.config("spark.cores.max", 16)
.config("spark.executor.cores", 4)
.config("spark.executor.memory", "512M")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.sql.files.maxPartitionBytes", "128MB")
).getOrCreate()
spark
SparkSession - in-memory
# Disable AQE and Broadcast join
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.shuffle.spill", "false") # Disable spilling during shuffle
from pyspark.sql.functions import broadcast
# Read Sales data
sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"
sales = spark.read.format("csv").schema(sales_schema).option("header", True).load("data/new_sales_10M.csv")
# Read City data
city_schema = "city_id string, city string, state string, state_abv string, country string"
city = spark.read.format("csv").schema(city_schema).option("header", True).load("data/cities_large.csv")
# Join Data
from pyspark.sql.functions import broadcast
df_sales_joined_broadcast = sales.join(broadcast(city), on=sales.city_id==city.city_id, how="left_outer")
df_sales_joined_broadcast.write.format("noop").mode("overwrite").save()
25/02/22 04:25:09 WARN MemoryStore: Not enough space to cache broadcast_2 in memory! (computed 432.0 MiB so far) 25/02/22 04:25:09 WARN BlockManager: Persisting block broadcast_2 to disk instead. 25/02/22 04:25:13 WARN MemoryStore: Not enough space to cache broadcast_2 in memory! (computed 432.0 MiB so far) 25/02/22 04:25:28 WARN MemoryStore: Not enough space to cache broadcast_2 in memory! (computed 432.0 MiB so far) 25/02/22 04:25:42 WARN MemoryStore: Not enough space to cache broadcast_2 in memory! (computed 432.0 MiB so far)

Bucketing¶



Bucket i from sales and Bucket i from city will be read form same executor
Partitioning vs Bucketing¶
Partitioning in Spark
Partitioning is an integral concept in Spark that controls how the data is physically distributed across various nodes in the cluster during data processing. Spark, by default, performs data partitioning, which can also be manually optimized based on the workload.
How does Partitioning work?
In Spark, an RDD (Resilient Distributed Dataset), DataFrame, or Dataset is divided into a number of partitions, each of which can be computed on different nodes in the cluster. Data in each partition is processed in parallel, ensuring efficient use of cluster resources and enhancing the speed of computation.
Custom Partitioning
While Spark’s default partitioning works well in most cases, there are instances where custom partitioning might be necessary. For example, if your data is skewed, with some keys having significantly more values than others, the default partitioning may result in some partitions being much larger than others. This could lead to unequal distribution of work across the nodes. In such scenarios, one can use a custom partitioner to ensure a more even distribution of data.
Bucketing in Spark
Bucketing is a technique in Spark that is used to distribute data across multiple buckets or files based on the hash of a column value. This method is particularly useful when working with large datasets and performing operations like joins, which can be computationally expensive.
How does Bucketing work?
Bucketing works by specifying a column and a number of buckets during the creation of the DataFrame. Spark then applies a hash function to the specified column and divides the data into buckets corresponding to the hash values. The number of buckets remains fixed, so the distribution of data doesn’t change with the size of the data.
Why is Bucketing important?
Performance Improvement: Bucketing can significantly improve the performance of Spark jobs that involve shuffle operations like groupBy, join, orderBy, etc., by limiting the number of outputs and reducing the data shuffle across the network.
Avoid Data Skew: Bucketing can help avoid data skew in certain operations, leading to more efficient utilization of resources.
Reduce Data Redundancy: When performing operations on a subset of data, bucketing allows Spark to avoid full data scan, reducing IO operations and improving query performance.
It’s essential to note that bucketing has its overhead, namely the computational cost of computing the hash of the bucket column and the increased storage used by many small files. Therefore, bucketing is most beneficial when the computational or IO savings outweigh these costs.
Practical Examples
Partitioning in Spark
Consider you have a large DataFrame df and you frequently run operations on it filtered by the country column. By default, Spark might distribute the data across the partitions arbitrarily, which means each operation involves scanning all partitions. But if you partition the DataFrame by country, Spark can optimize these operations by only accessing the relevant partitions.
df.write.partitionBy("country").parquet("/path/to/data.parquet")
df = spark.read.parquet("/path/to/data.parquet")
df.filter(df.country == 'USA').show()
Spark skips all the partitions not related to USA
Bucketing in Spark
Suppose you have a DataFrame salesData with columns transactionId, customerId, itemId, and amount. If you frequently run queries that involve operations like JOIN or GROUP BY on customerId, you can use bucketing to speed up these operations.
numBuckets = 100
salesData.write.bucketBy(numBuckets, "customerId").sortBy("transactionId").saveAsTable("salesData_bucketed")

Demo¶
# Read Sales data
sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"
sales = spark.read.format("csv").schema(sales_schema).option("header", True).load("data/new_sales_10M.csv")
# Read City data
city_schema = "city_id string, city string, state string, state_abv string, country string"
city = spark.read.format("csv").schema(city_schema).option("header", True).load("data/cities_large.csv")
# Write Sales data in Buckets
sales.write.format("csv").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "SparkLearning/data/sales_bucket").saveAsTable("sales_bucket")

Data divided into 6 partitions, 4 files in each partition pertaining to no of buckets

spark.sql('''show tables in default''').show()
+---------+------------+-----------+ |namespace| tableName|isTemporary| +---------+------------+-----------+ | default|sales_bucket| false| +---------+------------+-----------+
city.write.format("csv").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "SparkLearning/data/city_bucket").saveAsTable("city_bucket")

spark.sql('''show tables in default''').show()
+---------+------------+-----------+ |namespace| tableName|isTemporary| +---------+------------+-----------+ | default| city_bucket| false| | default|sales_bucket| false| +---------+------------+-----------+
# Join datasets
# Read Sales table
sales_bucket = spark.read.table("sales_bucket")
# Read City table
city_bucket = spark.read.table("city_bucket")
df_joined_bucketed = sales_bucket.join(city_bucket, on=sales_bucket.city_id==city_bucket.city_id, how="left_outer")
# Write dataset
df_joined_bucketed.write.format("noop").mode("overwrite").save()

Observe there is no particular diff in times, both are approx 40s but here there is no shuffle involved + next join on these two tables would be faster as there is bucketing in place.
Bucketing is a one time operation
Tasks Breakdown¶

Here there are 4 tasks because of 4 buckets.
The input records for each task is the sum of (city bucket i) + (sales bucket i) - header rows
# Set options to prevent truncation
pd.set_option('display.max_columns', None) # Show all columns
pd.set_option('display.max_rows', None) # Show all rows
pd.set_option('display.max_colwidth', None) # Show full content of each column
import subprocess
import pandas as pd
ls_output = subprocess.check_output("ls -lh /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket", shell=True).decode("utf-8")
ls_lines = ls_output.splitlines()
ls_data = [line.split() for line in ls_lines[1:]] # Skip the first line (total count)
df = pd.DataFrame(ls_data)
# Show the table
df
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | |
---|---|---|---|---|---|---|---|---|---|
0 | -rw-r--r-- | 1 | ubuntu | ubuntu | 0 | Feb | 22 | 04:27 | _SUCCESS |
1 | -rw-r--r-- | 1 | ubuntu | ubuntu | 22M | Feb | 22 | 04:27 | part-00000-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
2 | -rw-r--r-- | 1 | ubuntu | ubuntu | 31M | Feb | 22 | 04:27 | part-00000-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00001.c000.csv |
3 | -rw-r--r-- | 1 | ubuntu | ubuntu | 29M | Feb | 22 | 04:27 | part-00000-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00002.c000.csv |
4 | -rw-r--r-- | 1 | ubuntu | ubuntu | 28M | Feb | 22 | 04:27 | part-00000-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00003.c000.csv |
5 | -rw-r--r-- | 1 | ubuntu | ubuntu | 22M | Feb | 22 | 04:27 | part-00001-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
6 | -rw-r--r-- | 1 | ubuntu | ubuntu | 31M | Feb | 22 | 04:27 | part-00001-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00001.c000.csv |
7 | -rw-r--r-- | 1 | ubuntu | ubuntu | 29M | Feb | 22 | 04:27 | part-00001-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00002.c000.csv |
8 | -rw-r--r-- | 1 | ubuntu | ubuntu | 28M | Feb | 22 | 04:27 | part-00001-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00003.c000.csv |
9 | -rw-r--r-- | 1 | ubuntu | ubuntu | 22M | Feb | 22 | 04:27 | part-00002-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
10 | -rw-r--r-- | 1 | ubuntu | ubuntu | 31M | Feb | 22 | 04:27 | part-00002-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00001.c000.csv |
11 | -rw-r--r-- | 1 | ubuntu | ubuntu | 29M | Feb | 22 | 04:27 | part-00002-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00002.c000.csv |
12 | -rw-r--r-- | 1 | ubuntu | ubuntu | 28M | Feb | 22 | 04:27 | part-00002-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00003.c000.csv |
13 | -rw-r--r-- | 1 | ubuntu | ubuntu | 22M | Feb | 22 | 04:27 | part-00003-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
14 | -rw-r--r-- | 1 | ubuntu | ubuntu | 31M | Feb | 22 | 04:27 | part-00003-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00001.c000.csv |
15 | -rw-r--r-- | 1 | ubuntu | ubuntu | 29M | Feb | 22 | 04:27 | part-00003-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00002.c000.csv |
16 | -rw-r--r-- | 1 | ubuntu | ubuntu | 28M | Feb | 22 | 04:27 | part-00003-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00003.c000.csv |
17 | -rw-r--r-- | 1 | ubuntu | ubuntu | 22M | Feb | 22 | 04:27 | part-00004-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
18 | -rw-r--r-- | 1 | ubuntu | ubuntu | 31M | Feb | 22 | 04:27 | part-00004-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00001.c000.csv |
19 | -rw-r--r-- | 1 | ubuntu | ubuntu | 29M | Feb | 22 | 04:27 | part-00004-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00002.c000.csv |
20 | -rw-r--r-- | 1 | ubuntu | ubuntu | 28M | Feb | 22 | 04:27 | part-00004-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00003.c000.csv |
21 | -rw-r--r-- | 1 | ubuntu | ubuntu | 19M | Feb | 22 | 04:27 | part-00005-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
22 | -rw-r--r-- | 1 | ubuntu | ubuntu | 27M | Feb | 22 | 04:27 | part-00005-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00001.c000.csv |
23 | -rw-r--r-- | 1 | ubuntu | ubuntu | 26M | Feb | 22 | 04:27 | part-00005-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00002.c000.csv |
24 | -rw-r--r-- | 1 | ubuntu | ubuntu | 25M | Feb | 22 | 04:27 | part-00005-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00003.c000.csv |
Check 0th bucket data count for all partitions
ls_output = subprocess.check_output("ls -l /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-*_00000.c000.csv", shell=True).decode("utf-8")
ls_lines = ls_output.splitlines()
ls_data = [line.split() for line in ls_lines[1:]] # Skip the first line (total count)
df = pd.DataFrame(ls_data)
# Show the table
df
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | |
---|---|---|---|---|---|---|---|---|---|
0 | -rw-r--r-- | 1 | ubuntu | ubuntu | 22938797 | Feb | 22 | 04:27 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00001-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
1 | -rw-r--r-- | 1 | ubuntu | ubuntu | 22878206 | Feb | 22 | 04:27 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00002-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
2 | -rw-r--r-- | 1 | ubuntu | ubuntu | 22884271 | Feb | 22 | 04:27 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00003-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
3 | -rw-r--r-- | 1 | ubuntu | ubuntu | 22902138 | Feb | 22 | 04:27 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00004-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
4 | -rw-r--r-- | 1 | ubuntu | ubuntu | 19899932 | Feb | 22 | 04:27 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00005-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
Count the total no of records for al 4 parts
ls_output = subprocess.check_output("wc -l /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-*_00000.c000.csv", shell=True).decode("utf-8")
ls_lines = ls_output.splitlines()
ls_data = [line.split() for line in ls_lines[1:]] # Skip the first line (total count)
df = pd.DataFrame(ls_data)
# Show the table
df
0 | 1 | |
---|---|---|
0 | 245699 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00001-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
1 | 245036 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00002-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
2 | 245055 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00003-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
3 | 245300 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00004-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
4 | 213147 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/sales_bucket/part-00005-0c6abbd5-e8ef-4da3-94a0-fce1f9cad25e_00000.c000.csv |
5 | 1439458 | total |
Check total number of records for city bucket 0
ls_output = subprocess.check_output("wc -l /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/city_bucket/part-*_00000.c000.csv", shell=True).decode("utf-8")
ls_lines = ls_output.splitlines()
ls_data = [line.split() for line in ls_lines[1:]] # Skip the first line (total count)
df = pd.DataFrame(ls_data)
# Show the table
df
0 | 1 | |
---|---|---|
0 | 277240 | /home/ubuntu/SparkLearning/spark-warehouse/SparkLearning/data/city_bucket/part-00001-84513e10-2ffe-4fca-8940-b3b78828a59b_00000.c000.csv |
1 | 586724 | total |
Total
print(1439458+586724)
2026182
Now subtract the header rows (6)
print(1439458+586724-6) # equals the number of records in the first task 432
2026176

Points to Note¶
Joining columns diff from bucket column - Shuffle on both sides
Joining column same, one column in bucket - Shuffle on non bucket table
Joining column same, diff bucket size - Shuffle on table with smaller bucket size
Joining column same, same bucket size - no shuffle, fast joins
Too many buckets with not enough data leads to small file issues