Optimización Shuffles
Note : This demo is not very clear since Im running it on local cluster, its not able to read the entire data, best to use databricks cluster for this
In [1]:
Copied!
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Optimizing Shuffles")
.master("local[*]")
.config("spark.cores.max", 16)
.config("spark.executor.cores", 4)
.config("spark.executor.memory", "512M")
.getOrCreate()
)
spark
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Optimizing Shuffles")
.master("local[*]")
.config("spark.cores.max", 16)
.config("spark.executor.cores", 4)
.config("spark.executor.memory", "512M")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/17 02:05:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[1]:
SparkSession - in-memory
In [2]:
Copied!
spark.sparkContext.defaultParallelism
spark.sparkContext.defaultParallelism
Out[2]:
2
In [3]:
Copied!
spark.conf.set("spark.sql.adaptive.enabled",False)
spark.conf.set("spark.sql.adaptive.coaleascePartitions.enabled",False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
spark.conf.set("spark.sql.adaptive.enabled",False)
spark.conf.set("spark.sql.adaptive.coaleascePartitions.enabled",False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
In [10]:
Copied!
# Read EMP CSV file with 10M records
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"
emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/employee_records.csv")
# Read EMP CSV file with 10M records
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"
emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/employee_records.csv")
In [11]:
Copied!
# Find out avg salary as per dept
from pyspark.sql.functions import avg
emp_avg = emp.groupBy("department_id").agg(avg("salary").alias("avg_sal"))
# Find out avg salary as per dept
from pyspark.sql.functions import avg
emp_avg = emp.groupBy("department_id").agg(avg("salary").alias("avg_sal"))
In [12]:
Copied!
# Write data for performance benchmarking
emp_avg.write.format("noop").mode("overwrite").save()
# Write data for performance benchmarking
emp_avg.write.format("noop").mode("overwrite").save()
In [19]:
Copied!
emp_avg.count()
emp_avg.count()
Out[19]:
10
If you carefully observe, when we read the data, 200 tasks are formed because the default shuffle partitions is 200
In [16]:
Copied!
spark.conf.get("spark.sql.shuffle.partitions")
spark.conf.get("spark.sql.shuffle.partitions")
Out[16]:
'200'
Some of the tasks do not even read in any shuffled data, so its just a waste of compute running it. SO let's decrease the number of shuffle partitions to avoid this
In [17]:
Copied!
spark.conf.set("spark.sql.shuffle.partitions", 16)
spark.conf.set("spark.sql.shuffle.partitions", 16)
In [18]:
Copied!
from pyspark.sql.functions import spark_partition_id
emp.withColumn("partition_id", spark_partition_id()).where("partition_id = 0").show()
from pyspark.sql.functions import spark_partition_id
emp.withColumn("partition_id", spark_partition_id()).where("partition_id = 0").show()
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+------------+ |first_name| last_name| job_title| dob| email| phone| salary|department_id|partition_id| +----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+------------+ | Richard| Morrison|Public relations ...|1973-05-05|melissagarcia@exa...| (699)525-4827|512653.0| 8| 0| | Bobby| Mccarthy| Barrister's clerk|1974-04-25| llara@example.net| (750)846-1602x7458|999836.0| 7| 0| | Dennis| Norman|Land/geomatics su...|1990-06-24| jturner@example.net| 873.820.0518x825|131900.0| 10| 0| | John| Monroe| Retail buyer|1968-06-16| erik33@example.net| 820-813-0557x624|485506.0| 1| 0| | Michelle| Elliott| Air cabin crew|1975-03-31|tiffanyjohnston@e...| (705)900-5337|604738.0| 8| 0| | Ashley| Montoya| Cartographer|1976-01-16|patrickalexandra@...| 211.440.5466|483339.0| 6| 0| | Nathaniel| Smith| Quality manager|1985-06-28| lori44@example.net| 936-403-3179|419644.0| 7| 0| | Faith| Cummings|Industrial/produc...|1978-07-01| ygordon@example.org| (889)246-5588|205939.0| 7| 0| | Margaret| Sutton|Administrator, ed...|1975-08-16| diana44@example.net|001-647-530-5036x...|671167.0| 8| 0| | Mary| Sutton| Freight forwarder|1979-12-28| ryan36@example.com| 422.562.7254x3159|993829.0| 7| 0| | Jake| King| Lexicographer|1994-07-11|monica93@example.org|+1-535-652-9715x6...|702101.0| 4| 0| | Heather| Haley| Music tutor|1981-06-01|stephanie65@examp...| (652)815-7973x298|570960.0| 6| 0| | Thomas| Thomas|Chartered managem...|2001-07-17|pwilliams@example...|001-245-848-0028x...|339441.0| 6| 0| | Leonard| Carlson| Art therapist|1990-10-18|gabrielmurray@exa...| 9247590563|469728.0| 8| 0| | Mark| Wood| Market researcher|1963-10-13|nicholas76@exampl...| 311.439.1606x3342|582291.0| 4| 0| | Tracey|Washington|Travel agency man...|1986-05-07| mark07@example.com| 001-912-206-6456|146456.0| 4| 0| | Rachael| Rodriguez| Media buyer|1966-12-02|griffinmary@examp...| +1-791-344-7586x548|544732.0| 1| 0| | Tara| Liu| Financial adviser|1998-10-12|alexandraobrien@e...| 216.696.6061|399503.0| 3| 0| | Ana| Joseph| Retail manager|1995-01-10| rmorse@example.org| (726)363-7526x9965|761988.0| 10| 0| | Richard| Hall|Engineer, civil (...|1967-03-02|brandoncardenas@e...| (964)451-9007x22496|660659.0| 4| 0| +----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+------------+ only showing top 20 rows
In [22]:
Copied!
# Read the partitioned data
emp_part = spark.read.format("csv").schema(_schema).option("header", True).load("data/employee_records.csv/")
# Read the partitioned data
emp_part = spark.read.format("csv").schema(_schema).option("header", True).load("data/employee_records.csv/")
In [23]:
Copied!
emp_avg = emp_part.groupBy("department_id").agg(avg("salary").alias("avg_sal"))
emp_avg = emp_part.groupBy("department_id").agg(avg("salary").alias("avg_sal"))
In [24]:
Copied!
emp_avg.write.format("noop").mode("overwrite").save()
emp_avg.write.format("noop").mode("overwrite").save()
Observe that in spark UI we can see, there are lesser number of tasks and most of them read some shuffle data rather than being idle
Also now since we did group by on partitioned data, there is no mix and match of dept records in the executors so the shuffling is reduced when we do aggregation.
In [27]:
Copied!
from pyspark.sql.functions import spark_partition_id
from pyspark.sql.functions import spark_partition_id
In [29]:
Copied!
emp_parti = emp_avg.withColumn("partition_id",spark_partition_id())
emp_parti = emp_avg.withColumn("partition_id",spark_partition_id())
In [30]:
Copied!
emp_parti.show()
emp_parti.show()
[Stage 16:> (0 + 2) / 2]
+-------------+------------------+------------+ |department_id| avg_sal|partition_id| +-------------+------------------+------------+ | 10| 502682.2575766687| 2| | 5| 504167.9429997006| 2| | 1|504876.96401242825| 3| | 3| 504697.6808514883| 3| | 2| 503563.2174529479| 6| | 6|504428.12590014644| 9| | 9| 504945.3055672206| 9| | 7|504514.38453985273| 11| | 4| 505419.4963977089| 14| | 8| 505299.1226286386| 15| +-------------+------------------+------------+
In [ ]:
Copied!