Joins y Particiones
In [5]:
Copied!
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Joins and Data Partitions")
.master("local[*]")
.getOrCreate()
)
spark
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Joins and Data Partitions")
.master("local[*]")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/15 13:03:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 25/02/15 13:03:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 25/02/15 13:03:38 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
Out[5]:
SparkSession - in-memory
In [6]:
Copied!
# Emp Data & Schema
emp_data = [
["001","101","John Doe","30","Male","50000","2015-01-01"],
["002","101","Jane Smith","25","Female","45000","2016-02-15"],
["003","102","Bob Brown","35","Male","55000","2014-05-01"],
["004","102","Alice Lee","28","Female","48000","2017-09-30"],
["005","103","Jack Chan","40","Male","60000","2013-04-01"],
["006","103","Jill Wong","32","Female","52000","2018-07-01"],
["007","101","James Johnson","42","Male","70000","2012-03-15"],
["008","102","Kate Kim","29","Female","51000","2019-10-01"],
["009","103","Tom Tan","33","Male","58000","2016-06-01"],
["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
["011","104","David Park","38","Male","65000","2015-11-01"],
["012","105","Susan Chen","31","Female","54000","2017-02-15"],
["013","106","Brian Kim","45","Male","75000","2011-07-01"],
["014","107","Emily Lee","26","Female","46000","2019-01-01"],
["015","106","Michael Lee","37","Male","63000","2014-09-30"],
["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
["017","105","George Wang","34","Male","57000","2016-03-15"],
["018","104","Nancy Liu","29","","50000","2017-06-01"],
["019","103","Steven Chen","36","Male","62000","2015-08-01"],
["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]
emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"
dept_data = [
["101", "Sales", "NYC", "US", "1000000"],
["102", "Marketing", "LA", "US", "900000"],
["103", "Finance", "London", "UK", "1200000"],
["104", "Engineering", "Beijing", "China", "1500000"],
["105", "Human Resources", "Tokyo", "Japan", "800000"],
["106", "Research and Development", "Perth", "Australia", "1100000"],
["107", "Customer Service", "Sydney", "Australia", "950000"]
]
dept_schema = "department_id string, department_name string, city string, country string, budget string"
# Emp Data & Schema
emp_data = [
["001","101","John Doe","30","Male","50000","2015-01-01"],
["002","101","Jane Smith","25","Female","45000","2016-02-15"],
["003","102","Bob Brown","35","Male","55000","2014-05-01"],
["004","102","Alice Lee","28","Female","48000","2017-09-30"],
["005","103","Jack Chan","40","Male","60000","2013-04-01"],
["006","103","Jill Wong","32","Female","52000","2018-07-01"],
["007","101","James Johnson","42","Male","70000","2012-03-15"],
["008","102","Kate Kim","29","Female","51000","2019-10-01"],
["009","103","Tom Tan","33","Male","58000","2016-06-01"],
["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
["011","104","David Park","38","Male","65000","2015-11-01"],
["012","105","Susan Chen","31","Female","54000","2017-02-15"],
["013","106","Brian Kim","45","Male","75000","2011-07-01"],
["014","107","Emily Lee","26","Female","46000","2019-01-01"],
["015","106","Michael Lee","37","Male","63000","2014-09-30"],
["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
["017","105","George Wang","34","Male","57000","2016-03-15"],
["018","104","Nancy Liu","29","","50000","2017-06-01"],
["019","103","Steven Chen","36","Male","62000","2015-08-01"],
["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]
emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"
dept_data = [
["101", "Sales", "NYC", "US", "1000000"],
["102", "Marketing", "LA", "US", "900000"],
["103", "Finance", "London", "UK", "1200000"],
["104", "Engineering", "Beijing", "China", "1500000"],
["105", "Human Resources", "Tokyo", "Japan", "800000"],
["106", "Research and Development", "Perth", "Australia", "1100000"],
["107", "Customer Service", "Sydney", "Australia", "950000"]
]
dept_schema = "department_id string, department_name string, city string, country string, budget string"
In [7]:
Copied!
# Create emp & dept DataFrame
emp = spark.createDataFrame(data=emp_data, schema=emp_schema)
dept = spark.createDataFrame(data=dept_data, schema=dept_schema)
# Create emp & dept DataFrame
emp = spark.createDataFrame(data=emp_data, schema=emp_schema)
dept = spark.createDataFrame(data=dept_data, schema=dept_schema)
In [8]:
Copied!
emp.rdd.getNumPartitions()
emp.rdd.getNumPartitions()
Out[8]:
2
In [9]:
Copied!
# Repartition of data using repartition & coalesce
emp_partitioned = emp.repartition(4, "department_id")
# Repartition of data using repartition & coalesce
emp_partitioned = emp.repartition(4, "department_id")
In [10]:
Copied!
emp.rdd.getNumPartitions()
emp.rdd.getNumPartitions()
Out[10]:
2
In [13]:
Copied!
spark.conf.set("spark.default.parallelism", 8) # Set default to 8 partitions
spark.conf.set("spark.default.parallelism", 8) # Set default to 8 partitions
In [14]:
Copied!
# Create emp & dept DataFrame
emp_1 = spark.createDataFrame(data=emp_data, schema=emp_schema)
dept_1 = spark.createDataFrame(data=dept_data, schema=dept_schema)
# Create emp & dept DataFrame
emp_1 = spark.createDataFrame(data=emp_data, schema=emp_schema)
dept_1 = spark.createDataFrame(data=dept_data, schema=dept_schema)
In [15]:
Copied!
emp.rdd.getNumPartitions()
emp.rdd.getNumPartitions()
Out[15]:
2
In [16]:
Copied!
# Repartition of data using repartition & coalesce
emp_partitioned = emp.repartition(4, "department_id")
# Repartition of data using repartition & coalesce
emp_partitioned = emp.repartition(4, "department_id")
In [17]:
Copied!
emp_partitioned.rdd.getNumPartitions()
emp_partitioned.rdd.getNumPartitions()
[Stage 0:> (0 + 2) / 2]
Out[17]:
4
In [18]:
Copied!
from pyspark.sql.functions import spark_partition_id
emp_1 = emp.repartition(4, "department_id").withColumn("partition_num", spark_partition_id())
from pyspark.sql.functions import spark_partition_id
emp_1 = emp.repartition(4, "department_id").withColumn("partition_num", spark_partition_id())
In [19]:
Copied!
emp_1.show()
emp_1.show()
+-----------+-------------+-------------+---+------+------+----------+-------------+ |employee_id|department_id| name|age|gender|salary| hire_date|partition_num| +-----------+-------------+-------------+---+------+------+----------+-------------+ | 003| 102| Bob Brown| 35| Male| 55000|2014-05-01| 0| | 004| 102| Alice Lee| 28|Female| 48000|2017-09-30| 0| | 008| 102| Kate Kim| 29|Female| 51000|2019-10-01| 0| | 014| 107| Emily Lee| 26|Female| 46000|2019-01-01| 0| | 016| 107| Kelly Zhang| 30|Female| 49000|2018-04-01| 0| | 020| 102| Grace Kim| 32|Female| 53000|2018-11-01| 0| | 012| 105| Susan Chen| 31|Female| 54000|2017-02-15| 1| | 017| 105| George Wang| 34| Male| 57000|2016-03-15| 1| | 010| 104| Lisa Lee| 27|Female| 47000|2018-08-01| 2| | 011| 104| David Park| 38| Male| 65000|2015-11-01| 2| | 013| 106| Brian Kim| 45| Male| 75000|2011-07-01| 2| | 015| 106| Michael Lee| 37| Male| 63000|2014-09-30| 2| | 018| 104| Nancy Liu| 29| | 50000|2017-06-01| 2| | 001| 101| John Doe| 30| Male| 50000|2015-01-01| 3| | 002| 101| Jane Smith| 25|Female| 45000|2016-02-15| 3| | 005| 103| Jack Chan| 40| Male| 60000|2013-04-01| 3| | 006| 103| Jill Wong| 32|Female| 52000|2018-07-01| 3| | 007| 101|James Johnson| 42| Male| 70000|2012-03-15| 3| | 009| 103| Tom Tan| 33| Male| 58000|2016-06-01| 3| | 019| 103| Steven Chen| 36| Male| 62000|2015-08-01| 3| +-----------+-------------+-------------+---+------+------+----------+-------------+
In [20]:
Copied!
# INNER JOIN datasets
# select e.emp_name, d.department_name, d.department_id, e.salary
# from emp e inner join dept d on emp.department_id = dept.department_id
df_joined = emp.alias("e").join(dept.alias("d"), how="inner", on=emp.department_id==dept.department_id)
# INNER JOIN datasets
# select e.emp_name, d.department_name, d.department_id, e.salary
# from emp e inner join dept d on emp.department_id = dept.department_id
df_joined = emp.alias("e").join(dept.alias("d"), how="inner", on=emp.department_id==dept.department_id)
In [21]:
Copied!
df_joined.show()
df_joined.show()
+-----------+-------------+-------------+---+------+------+----------+-------------+--------------------+-------+---------+-------+ |employee_id|department_id| name|age|gender|salary| hire_date|department_id| department_name| city| country| budget| +-----------+-------------+-------------+---+------+------+----------+-------------+--------------------+-------+---------+-------+ | 001| 101| John Doe| 30| Male| 50000|2015-01-01| 101| Sales| NYC| US|1000000| | 002| 101| Jane Smith| 25|Female| 45000|2016-02-15| 101| Sales| NYC| US|1000000| | 007| 101|James Johnson| 42| Male| 70000|2012-03-15| 101| Sales| NYC| US|1000000| | 003| 102| Bob Brown| 35| Male| 55000|2014-05-01| 102| Marketing| LA| US| 900000| | 004| 102| Alice Lee| 28|Female| 48000|2017-09-30| 102| Marketing| LA| US| 900000| | 008| 102| Kate Kim| 29|Female| 51000|2019-10-01| 102| Marketing| LA| US| 900000| | 020| 102| Grace Kim| 32|Female| 53000|2018-11-01| 102| Marketing| LA| US| 900000| | 005| 103| Jack Chan| 40| Male| 60000|2013-04-01| 103| Finance| London| UK|1200000| | 006| 103| Jill Wong| 32|Female| 52000|2018-07-01| 103| Finance| London| UK|1200000| | 009| 103| Tom Tan| 33| Male| 58000|2016-06-01| 103| Finance| London| UK|1200000| | 019| 103| Steven Chen| 36| Male| 62000|2015-08-01| 103| Finance| London| UK|1200000| | 010| 104| Lisa Lee| 27|Female| 47000|2018-08-01| 104| Engineering|Beijing| China|1500000| | 011| 104| David Park| 38| Male| 65000|2015-11-01| 104| Engineering|Beijing| China|1500000| | 018| 104| Nancy Liu| 29| | 50000|2017-06-01| 104| Engineering|Beijing| China|1500000| | 012| 105| Susan Chen| 31|Female| 54000|2017-02-15| 105| Human Resources| Tokyo| Japan| 800000| | 017| 105| George Wang| 34| Male| 57000|2016-03-15| 105| Human Resources| Tokyo| Japan| 800000| | 013| 106| Brian Kim| 45| Male| 75000|2011-07-01| 106|Research and Deve...| Perth|Australia|1100000| | 015| 106| Michael Lee| 37| Male| 63000|2014-09-30| 106|Research and Deve...| Perth|Australia|1100000| | 014| 107| Emily Lee| 26|Female| 46000|2019-01-01| 107| Customer Service| Sydney|Australia| 950000| | 016| 107| Kelly Zhang| 30|Female| 49000|2018-04-01| 107| Customer Service| Sydney|Australia| 950000| +-----------+-------------+-------------+---+------+------+----------+-------------+--------------------+-------+---------+-------+
In [23]:
Copied!
df_joined.explain()
df_joined.explain()
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [department_id#1], [department_id#14], Inner :- Sort [department_id#1 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(department_id#1, 200), ENSURE_REQUIREMENTS, [plan_id=315] : +- Filter isnotnull(department_id#1) : +- Scan ExistingRDD[employee_id#0,department_id#1,name#2,age#3,gender#4,salary#5,hire_date#6] +- Sort [department_id#14 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(department_id#14, 200), ENSURE_REQUIREMENTS, [plan_id=316] +- Filter isnotnull(department_id#14) +- Scan ExistingRDD[department_id#14,department_name#15,city#16,country#17,budget#18]
In [24]:
Copied!
# LEFT OUTER JOIN datasets
# select e.emp_name, d.department_name, d.department_id, e.salary
# from emp e left outer join dept d on emp.department_id = dept.department_id
df_joined = emp.alias("e").join(dept.alias("d"), how="left_outer", on=emp.department_id==dept.department_id)
# LEFT OUTER JOIN datasets
# select e.emp_name, d.department_name, d.department_id, e.salary
# from emp e left outer join dept d on emp.department_id = dept.department_id
df_joined = emp.alias("e").join(dept.alias("d"), how="left_outer", on=emp.department_id==dept.department_id)
In [25]:
Copied!
df_joined.select("e.name", "d.department_name", "d.department_id", "e.salary").show()
df_joined.select("e.name", "d.department_name", "d.department_id", "e.salary").show()
+-------------+--------------------+-------------+------+ | name| department_name|department_id|salary| +-------------+--------------------+-------------+------+ | John Doe| Sales| 101| 50000| | Jane Smith| Sales| 101| 45000| |James Johnson| Sales| 101| 70000| | Lisa Lee| Engineering| 104| 47000| | Bob Brown| Marketing| 102| 55000| | Alice Lee| Marketing| 102| 48000| | Kate Kim| Marketing| 102| 51000| | Jack Chan| Finance| 103| 60000| | Jill Wong| Finance| 103| 52000| | Tom Tan| Finance| 103| 58000| | Emily Lee| Customer Service| 107| 46000| | Kelly Zhang| Customer Service| 107| 49000| | David Park| Engineering| 104| 65000| | Nancy Liu| Engineering| 104| 50000| | Grace Kim| Marketing| 102| 53000| | Steven Chen| Finance| 103| 62000| | Brian Kim|Research and Deve...| 106| 75000| | Michael Lee|Research and Deve...| 106| 63000| | Susan Chen| Human Resources| 105| 54000| | George Wang| Human Resources| 105| 57000| +-------------+--------------------+-------------+------+
In [26]:
Copied!
# Write the final dataset
df_joined.select("e.name", "d.department_name", "d.department_id","e.salary").write.format("csv").save("data/output/7/emp_joined.csv")
# Write the final dataset
df_joined.select("e.name", "d.department_name", "d.department_id","e.salary").write.format("csv").save("data/output/7/emp_joined.csv")
In [27]:
Copied!
# Bonus TIP
# Joins with cascading conditions
# Join with Department_id and only for departments 101 or 102
# Join with not null/null conditions
df_final = emp.join(dept, how="left_outer",
on=(emp.department_id==dept.department_id) & ((emp.department_id == "101") | (emp.department_id == "102"))
& (emp.salary.isNull())
)
# Bonus TIP
# Joins with cascading conditions
# Join with Department_id and only for departments 101 or 102
# Join with not null/null conditions
df_final = emp.join(dept, how="left_outer",
on=(emp.department_id==dept.department_id) & ((emp.department_id == "101") | (emp.department_id == "102"))
& (emp.salary.isNull())
)
In [28]:
Copied!
df_final.show()
df_final.show()
+-----------+-------------+-------------+---+------+------+----------+-------------+---------------+----+-------+------+ |employee_id|department_id| name|age|gender|salary| hire_date|department_id|department_name|city|country|budget| +-----------+-------------+-------------+---+------+------+----------+-------------+---------------+----+-------+------+ | 001| 101| John Doe| 30| Male| 50000|2015-01-01| NULL| NULL|NULL| NULL| NULL| | 002| 101| Jane Smith| 25|Female| 45000|2016-02-15| NULL| NULL|NULL| NULL| NULL| | 007| 101|James Johnson| 42| Male| 70000|2012-03-15| NULL| NULL|NULL| NULL| NULL| | 010| 104| Lisa Lee| 27|Female| 47000|2018-08-01| NULL| NULL|NULL| NULL| NULL| | 003| 102| Bob Brown| 35| Male| 55000|2014-05-01| NULL| NULL|NULL| NULL| NULL| | 004| 102| Alice Lee| 28|Female| 48000|2017-09-30| NULL| NULL|NULL| NULL| NULL| | 008| 102| Kate Kim| 29|Female| 51000|2019-10-01| NULL| NULL|NULL| NULL| NULL| | 005| 103| Jack Chan| 40| Male| 60000|2013-04-01| NULL| NULL|NULL| NULL| NULL| | 006| 103| Jill Wong| 32|Female| 52000|2018-07-01| NULL| NULL|NULL| NULL| NULL| | 009| 103| Tom Tan| 33| Male| 58000|2016-06-01| NULL| NULL|NULL| NULL| NULL| | 014| 107| Emily Lee| 26|Female| 46000|2019-01-01| NULL| NULL|NULL| NULL| NULL| | 016| 107| Kelly Zhang| 30|Female| 49000|2018-04-01| NULL| NULL|NULL| NULL| NULL| | 011| 104| David Park| 38| Male| 65000|2015-11-01| NULL| NULL|NULL| NULL| NULL| | 018| 104| Nancy Liu| 29| | 50000|2017-06-01| NULL| NULL|NULL| NULL| NULL| | 020| 102| Grace Kim| 32|Female| 53000|2018-11-01| NULL| NULL|NULL| NULL| NULL| | 019| 103| Steven Chen| 36| Male| 62000|2015-08-01| NULL| NULL|NULL| NULL| NULL| | 013| 106| Brian Kim| 45| Male| 75000|2011-07-01| NULL| NULL|NULL| NULL| NULL| | 015| 106| Michael Lee| 37| Male| 63000|2014-09-30| NULL| NULL|NULL| NULL| NULL| | 012| 105| Susan Chen| 31|Female| 54000|2017-02-15| NULL| NULL|NULL| NULL| NULL| | 017| 105| George Wang| 34| Male| 57000|2016-03-15| NULL| NULL|NULL| NULL| NULL| +-----------+-------------+-------------+---+------+------+----------+-------------+---------------+----+-------+------+
In [ ]:
Copied!