Variables Broadcast
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Distributed Shared Variables")
.master("local[*]")
.config("spark.cores.max", 16)
.config("spark.executor.cores", 4)
.config("spark.executor.memory", "512M")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/22 00:53:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
SparkSession - in-memory
In Spark, data is distributed across a cluster and processed in parallel. This means that each node in the cluster works on a separate partition of the data. However, certain operations may require sharing data among all nodes. Shared variables in Spark allow for efficient data sharing and ensure consistency across nodes. There are two types of shared variables in PySpark: Broadcast variables and Accumulators.
Broadcast variables are read-only variables that are cached on each worker node in a Spark cluster, allowing tasks running on those nodes to access the same shared data without the need for communication overhead. They are particularly useful when you have a large read-only dataset, such as a lookup table, that you want to use across multiple tasks.
lookup_data = {"apple": 1, "banana": 2, "orange": 3}
broadcast_lookup = spark.sparkContext.broadcast(lookup_data)
data = [("apple", 3), ("banana", 5), ("orange", 2)]
data_rdd = spark.sparkContext.parallelize(data)
def calculate_total_cost(item, count, lookup):
return count * lookup[item]
total_cost_rdd = data_rdd.map(lambda x: calculate_total_cost(x[0], x[1], broadcast_lookup.value))
total_cost_rdd.collect()
[3, 10, 6]
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"
emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/employee_records.csv")
emp.show(truncate = False)
+----------+----------+----------------------------------+----------+----------------------------+---------------------+--------+-------------+ |first_name|last_name |job_title |dob |email |phone |salary |department_id| +----------+----------+----------------------------------+----------+----------------------------+---------------------+--------+-------------+ |Richard |Morrison |Public relations account executive|1973-05-05|melissagarcia@example.org |(699)525-4827 |512653.0|8 | |Bobby |Mccarthy |Barrister's clerk |1974-04-25|llara@example.net |(750)846-1602x7458 |999836.0|7 | |Dennis |Norman |Land/geomatics surveyor |1990-06-24|jturner@example.net |873.820.0518x825 |131900.0|10 | |John |Monroe |Retail buyer |1968-06-16|erik33@example.net |820-813-0557x624 |485506.0|1 | |Michelle |Elliott |Air cabin crew |1975-03-31|tiffanyjohnston@example.net |(705)900-5337 |604738.0|8 | |Ashley |Montoya |Cartographer |1976-01-16|patrickalexandra@example.org|211.440.5466 |483339.0|6 | |Nathaniel |Smith |Quality manager |1985-06-28|lori44@example.net |936-403-3179 |419644.0|7 | |Faith |Cummings |Industrial/product designer |1978-07-01|ygordon@example.org |(889)246-5588 |205939.0|7 | |Margaret |Sutton |Administrator, education |1975-08-16|diana44@example.net |001-647-530-5036x7523|671167.0|8 | |Mary |Sutton |Freight forwarder |1979-12-28|ryan36@example.com |422.562.7254x3159 |993829.0|7 | |Jake |King |Lexicographer |1994-07-11|monica93@example.org |+1-535-652-9715x66854|702101.0|4 | |Heather |Haley |Music tutor |1981-06-01|stephanie65@example.net |(652)815-7973x298 |570960.0|6 | |Thomas |Thomas |Chartered management accountant |2001-07-17|pwilliams@example.com |001-245-848-0028x5105|339441.0|6 | |Leonard |Carlson |Art therapist |1990-10-18|gabrielmurray@example.com |9247590563 |469728.0|8 | |Mark |Wood |Market researcher |1963-10-13|nicholas76@example.com |311.439.1606x3342 |582291.0|4 | |Tracey |Washington|Travel agency manager |1986-05-07|mark07@example.com |001-912-206-6456 |146456.0|4 | |Rachael |Rodriguez |Media buyer |1966-12-02|griffinmary@example.org |+1-791-344-7586x548 |544732.0|1 | |Tara |Liu |Financial adviser |1998-10-12|alexandraobrien@example.org |216.696.6061 |399503.0|3 | |Ana |Joseph |Retail manager |1995-01-10|rmorse@example.org |(726)363-7526x9965 |761988.0|10 | |Richard |Hall |Engineer, civil (contracting) |1967-03-02|brandoncardenas@example.com |(964)451-9007x22496 |660659.0|4 | +----------+----------+----------------------------------+----------+----------------------------+---------------------+--------+-------------+ only showing top 20 rows
# Variable (Lookup)
dept_names = {1 : 'Department 1',
2 : 'Department 2',
3 : 'Department 3',
4 : 'Department 4',
5 : 'Department 5',
6 : 'Department 6',
7 : 'Department 7',
8 : 'Department 8',
9 : 'Department 9',
10 : 'Department 10'}
# Broadcast the variable
broadcast_dept_names = spark.sparkContext.broadcast(dept_names)
broadcast_dept_names.value
{1: 'Department 1', 2: 'Department 2', 3: 'Department 3', 4: 'Department 4', 5: 'Department 5', 6: 'Department 6', 7: 'Department 7', 8: 'Department 8', 9: 'Department 9', 10: 'Department 10'}
from pyspark.sql.functions import udf,col
@udf
def get_dept_name(dept_id):
return broadcast_dept_names.value.get(dept_id)
emp_final = emp.withColumn("dept_name",get_dept_name(col("department_id")))
emp_final.show()
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+-------------+ |first_name| last_name| job_title| dob| email| phone| salary|department_id| dept_name| +----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+-------------+ | Richard| Morrison|Public relations ...|1973-05-05|melissagarcia@exa...| (699)525-4827|512653.0| 8| Department 8| | Bobby| Mccarthy| Barrister's clerk|1974-04-25| llara@example.net| (750)846-1602x7458|999836.0| 7| Department 7| | Dennis| Norman|Land/geomatics su...|1990-06-24| jturner@example.net| 873.820.0518x825|131900.0| 10|Department 10| | John| Monroe| Retail buyer|1968-06-16| erik33@example.net| 820-813-0557x624|485506.0| 1| Department 1| | Michelle| Elliott| Air cabin crew|1975-03-31|tiffanyjohnston@e...| (705)900-5337|604738.0| 8| Department 8| | Ashley| Montoya| Cartographer|1976-01-16|patrickalexandra@...| 211.440.5466|483339.0| 6| Department 6| | Nathaniel| Smith| Quality manager|1985-06-28| lori44@example.net| 936-403-3179|419644.0| 7| Department 7| | Faith| Cummings|Industrial/produc...|1978-07-01| ygordon@example.org| (889)246-5588|205939.0| 7| Department 7| | Margaret| Sutton|Administrator, ed...|1975-08-16| diana44@example.net|001-647-530-5036x...|671167.0| 8| Department 8| | Mary| Sutton| Freight forwarder|1979-12-28| ryan36@example.com| 422.562.7254x3159|993829.0| 7| Department 7| | Jake| King| Lexicographer|1994-07-11|monica93@example.org|+1-535-652-9715x6...|702101.0| 4| Department 4| | Heather| Haley| Music tutor|1981-06-01|stephanie65@examp...| (652)815-7973x298|570960.0| 6| Department 6| | Thomas| Thomas|Chartered managem...|2001-07-17|pwilliams@example...|001-245-848-0028x...|339441.0| 6| Department 6| | Leonard| Carlson| Art therapist|1990-10-18|gabrielmurray@exa...| 9247590563|469728.0| 8| Department 8| | Mark| Wood| Market researcher|1963-10-13|nicholas76@exampl...| 311.439.1606x3342|582291.0| 4| Department 4| | Tracey|Washington|Travel agency man...|1986-05-07| mark07@example.com| 001-912-206-6456|146456.0| 4| Department 4| | Rachael| Rodriguez| Media buyer|1966-12-02|griffinmary@examp...| +1-791-344-7586x548|544732.0| 1| Department 1| | Tara| Liu| Financial adviser|1998-10-12|alexandraobrien@e...| 216.696.6061|399503.0| 3| Department 3| | Ana| Joseph| Retail manager|1995-01-10| rmorse@example.org| (726)363-7526x9965|761988.0| 10|Department 10| | Richard| Hall|Engineer, civil (...|1967-03-02|brandoncardenas@e...| (964)451-9007x22496|660659.0| 4| Department 4| +----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+-------------+ only showing top 20 rows
Observe that even though we have a join (shuffle transformation) it is not being shuffled here as seen in the DAG
Accumulators¶
When we need to do some complex transformations, there would be partitions of data lying on each of the executors and this data needs to be collected in one of them and aggregation needs to be done.
This involves shuffling of data that is not recommended from performance perspective.
Hence we can use Accumulators that can do row by row parsing of data in the partitions of each executor and return the data
# Accumulators
dept_sal = spark.sparkContext.accumulator(0)
# Use foreach
def calculate_salary(department_id, salary):
if department_id == 6:
dept_sal.add(salary)
emp.foreach(lambda row : calculate_salary(row.department_id, row.salary))
Observe that in the DAG we get the output in the same stage and there is no shuffling
dept_sal.value
50294510721.0