Optimización Joins I

Spark tries to bring city_id's that are same together in one executor

Shuffle Hash Join¶

In above example smaller dataset city is hashed and joined with bigger table sales. There is no sorting, its reliable strategy if we have one small table
Sort Merge Join¶
In this case there is sorting that happens first like in the first image, then there is a merge and join.
Its useful when we have two big tables
Broadcast Join¶

In [2]:
Copied!
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Join Optimizations")
.master("local[*]")
.config("spark.cores.max", 16)
.config("spark.executor.cores", 4)
.config("spark.executor.memory", "512M")
.getOrCreate()
)
spark
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Join Optimizations")
.master("local[*]")
.config("spark.cores.max", 16)
.config("spark.executor.cores", 4)
.config("spark.executor.memory", "512M")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/22 02:04:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[2]:
SparkSession - in-memory
In [14]:
Copied!
# Disable AQE and Broadcast join
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Disable AQE and Broadcast join
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
In [15]:
Copied!
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"
emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/employee_records.csv")
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"
emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/employee_records.csv")
In [16]:
Copied!
emp.count()
emp.count()
Out[16]:
1000000
In [17]:
Copied!
# Read DEPT CSV data
_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"
dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("data/department_data.csv")
# Read DEPT CSV data
_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"
dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("data/department_data.csv")
In [18]:
Copied!
dept.count()
dept.count()
Out[18]:
10
In [20]:
Copied!
# Without broadcast
df_joined = emp.join(dept,on=emp.department_id==dept.department_id,how = "left_outer")
df_joined.write.format("noop").mode("overwrite").save()
# Without broadcast
df_joined = emp.join(dept,on=emp.department_id==dept.department_id,how = "left_outer")
df_joined.write.format("noop").mode("overwrite").save()
In [21]:
Copied!
df_joined.explain()
df_joined.explain()
== Physical Plan == *(4) SortMergeJoin [department_id#189], [department_id#211], LeftOuter :- *(1) Sort [department_id#189 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(department_id#189, 200), ENSURE_REQUIREMENTS, [plan_id=205] : +- FileScan csv [first_name#182,last_name#183,job_title#184,dob#185,email#186,phone#187,salary#188,department_id#189] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/ubuntu/SparkLearning/data/employee_records.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,job_title:string,dob:string,email:string,phone:string,s... +- *(3) Sort [department_id#211 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(department_id#211, 200), ENSURE_REQUIREMENTS, [plan_id=217] +- *(2) Filter isnotnull(department_id#211) +- FileScan csv [department_id#211,department_name#212,description#213,city#214,state#215,country#216] Batched: false, DataFilters: [isnotnull(department_id#211)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/ubuntu/SparkLearning/data/department_data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(department_id)], ReadSchema: struct<department_id:int,department_name:string,description:string,city:string,state:string,count...


BroadCast Join¶
In [22]:
Copied!
# Join Datasets
from pyspark.sql.functions import broadcast
df_joined = emp.join(broadcast(dept),on=emp.department_id==dept.department_id,how = "left_outer")
# Join Datasets
from pyspark.sql.functions import broadcast
df_joined = emp.join(broadcast(dept),on=emp.department_id==dept.department_id,how = "left_outer")
In [23]:
Copied!
df_joined.write.format("noop").mode("overwrite").save()
df_joined.write.format("noop").mode("overwrite").save()
Two jobs here, one for reading the csv and other for joining

Big Table vs Big Table Join Sort Merge No Buckets¶
In [11]:
Copied!
# Disable AQE and Broadcast join
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Disable AQE and Broadcast join
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
In [12]:
Copied!
# Read Sales data
sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"
sales = spark.read.format("csv").schema(sales_schema).option("header", True).load("data/new_sales_10M.csv")
# Read Sales data
sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"
sales = spark.read.format("csv").schema(sales_schema).option("header", True).load("data/new_sales_10M.csv")
In [13]:
Copied!
sales.count()
sales.count()
Out[13]:
7202569
In [14]:
Copied!
# Read City data
city_schema = "city_id string, city string, state string, state_abv string, country string"
city = spark.read.format("csv").schema(city_schema).option("header", True).load("data/cities_large.csv")
# Read City data
city_schema = "city_id string, city string, state string, state_abv string, country string"
city = spark.read.format("csv").schema(city_schema).option("header", True).load("data/cities_large.csv")
In [15]:
Copied!
city.count()
city.count()
Out[15]:
2349391
In [16]:
Copied!
# Join Data
df_sales_joined = sales.join(city, on=sales.city_id==city.city_id, how="left_outer")
# Join Data
df_sales_joined = sales.join(city, on=sales.city_id==city.city_id, how="left_outer")
In [17]:
Copied!
df_sales_joined.write.format("noop").mode("overwrite").save()
df_sales_joined.write.format("noop").mode("overwrite").save()
In [18]:
Copied!
df_sales_joined.explain()
df_sales_joined.explain()
== Physical Plan == *(4) SortMergeJoin [city_id#94], [city_id#112], LeftOuter :- *(1) Sort [city_id#94 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(city_id#94, 200), ENSURE_REQUIREMENTS, [plan_id=264] : +- FileScan csv [transacted_at#89,trx_id#90,retailer_id#91,description#92,amount#93,city_id#94] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/ubuntu/SparkLearning/data/new_sales_10M.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<transacted_at:string,trx_id:string,retailer_id:string,description:string,amount:double,cit... +- *(3) Sort [city_id#112 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(city_id#112, 200), ENSURE_REQUIREMENTS, [plan_id=276] +- *(2) Filter isnotnull(city_id#112) +- FileScan csv [city_id#112,city#113,state#114,state_abv#115,country#116] Batched: false, DataFilters: [isnotnull(city_id#112)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/ubuntu/SparkLearning/data/cities_large.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city_id)], ReadSchema: struct<city_id:string,city:string,state:string,state_abv:string,country:string>
Let's try broadcasting the huge cities dataset now
In [19]:
Copied!
# Join Data
from pyspark.sql.functions import broadcast
df_sales_joined_broadcast = sales.join(broadcast(city), on=sales.city_id==city.city_id, how="left_outer")
# Join Data
from pyspark.sql.functions import broadcast
df_sales_joined_broadcast = sales.join(broadcast(city), on=sales.city_id==city.city_id, how="left_outer")
In [20]:
Copied!
df_sales_joined_broadcast.write.format("noop").mode("overwrite").save()
df_sales_joined_broadcast.write.format("noop").mode("overwrite").save()
25/02/22 02:13:02 WARN MemoryStore: Not enough space to cache broadcast_24 in memory! (computed 432.0 MiB so far) 25/02/22 02:13:02 WARN BlockManager: Persisting block broadcast_24 to disk instead. 25/02/22 02:13:06 WARN MemoryStore: Not enough space to cache broadcast_24 in memory! (computed 432.0 MiB so far) 25/02/22 02:13:08 WARN MemoryStore: Not enough space to cache broadcast_24 in memory! (computed 432.0 MiB so far) 25/02/22 02:13:19 WARN MemoryStore: Not enough space to cache broadcast_24 in memory! (computed 432.0 MiB so far) 25/02/22 02:13:32 WARN MemoryStore: Not enough space to cache broadcast_24 in memory! (computed 432.0 MiB so far)
There's not enough storage in the executors so jobs start failing or spill to disk

In [44]:
Copied!
spark.stop()
spark.stop()