DAG UI
In [1]:
Copied!
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Understand Plans and DAG")
.master("local[*]")
.getOrCreate()
)
spark
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Understand Plans and DAG")
.master("local[*]")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/16 16:43:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[1]:
SparkSession - in-memory
In [2]:
Copied!
spark.conf.set("spark.sql.adaptive.enabled",False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled",False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled",False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled",False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
In [3]:
Copied!
# Create dataframes
df_1 = spark.range(4, 200, 2) # 1 job -> 8 tasks -> WriteExchange
df_2 = spark.range(2, 200, 4) # 1 job -> 8 tasks -> WriteExchange
# Create dataframes
df_1 = spark.range(4, 200, 2) # 1 job -> 8 tasks -> WriteExchange
df_2 = spark.range(2, 200, 4) # 1 job -> 8 tasks -> WriteExchange
In [4]:
Copied!
#Shuffle
df_3 = df_1.repartition(5) # ReadExchange -> 1 job -> 5 tasks -> WriteExchange
df_4 = df_2.repartition(7) # ReadExchange -> 1 job -> 7 tasks -> WriteExchange
#Shuffle
df_3 = df_1.repartition(5) # ReadExchange -> 1 job -> 5 tasks -> WriteExchange
df_4 = df_2.repartition(7) # ReadExchange -> 1 job -> 7 tasks -> WriteExchange
In [5]:
Copied!
df_4.rdd.getNumPartitions()
df_4.rdd.getNumPartitions()
Out[5]:
7
In [6]:
Copied!
# Join the dataframes
# Shuffle
df_joined = df_3.join(df_4, on="id") # 1 stage -> read data from WriteExchange -> write to 200 partitions
# Join the dataframes
# Shuffle
df_joined = df_3.join(df_4, on="id") # 1 stage -> read data from WriteExchange -> write to 200 partitions
In [7]:
Copied!
# Get the sum of ids
df_sum = df_joined.selectExpr("sum(id) as total_sum") # 1 stage -> 1 task read data from above 200 partitions
# Get the sum of ids
df_sum = df_joined.selectExpr("sum(id) as total_sum") # 1 stage -> 1 task read data from above 200 partitions
In [15]:
Copied!
df_joined.count()
df_joined.count()
Out[15]:
49
In [10]:
Copied!
df_sum.explain()
df_sum.explain()
== Physical Plan == *(6) HashAggregate(keys=[], functions=[sum(id#0L)]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=253] +- *(5) HashAggregate(keys=[], functions=[partial_sum(id#0L)]) +- *(5) Project [id#0L] +- *(5) SortMergeJoin [id#0L], [id#2L], Inner :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [plan_id=237] : +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=236] : +- *(1) Range (4, 200, step=2, splits=2) +- *(4) Sort [id#2L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#2L, 200), ENSURE_REQUIREMENTS, [plan_id=244] +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=243] +- *(3) Range (2, 200, step=4, splits=2)
The number in '( )' maps to number after "WholeStageCodeGen" in the UI
How Spark Skips Stages?¶
In [11]:
Copied!
# Union the data again to see the skipped stages
df_union = df_sum.union(df_4)
# Union the data again to see the skipped stages
df_union = df_sum.union(df_4)
In [12]:
Copied!
df_union.show()
df_union.show()
[Stage 16:==========================================> (157 + 2) / 200]
+---------+ |total_sum| +---------+ | 4998| | 14| | 86| | 42| | 146| | 134| | 142| | 162| | 74| | 94| | 34| | 198| | 182| | 126| | 174| | 98| | 10| | 82| | 122| | 186| +---------+ only showing top 20 rows
Observe that all the stages that previously read df_sum
and df_4
are now skipped
In [13]:
Copied!
df_union.explain()
df_union.explain()
== Physical Plan == Union :- *(6) HashAggregate(keys=[], functions=[sum(id#0L)]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=491] : +- *(5) HashAggregate(keys=[], functions=[partial_sum(id#0L)]) : +- *(5) Project [id#0L] : +- *(5) SortMergeJoin [id#0L], [id#2L], Inner : :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [plan_id=475] : : +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=474] : : +- *(1) Range (4, 200, step=2, splits=2) : +- *(4) Sort [id#2L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#2L, 200), ENSURE_REQUIREMENTS, [plan_id=482] : +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=481] : +- *(3) Range (2, 200, step=4, splits=2) +- ReusedExchange [id#25L], Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=481]
Observe we are reusing the same exchange shuffle write and not creating new one by reading df_4 once again
Go to the SQL / Dataframe tab to clearly see how it doesnt read the data again
In [ ]:
Copied!