Escritura de Datos
In [1]:
Copied!
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Writing data in Spark")
.master("local[*]")
.getOrCreate()
)
spark
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Writing data in Spark")
.master("local[*]")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/16 12:23:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[1]:
SparkSession - in-memory
In [2]:
Copied!
spark.sparkContext.defaultParallelism
spark.sparkContext.defaultParallelism
Out[2]:
2
So basically we just have one executor that is the driver itself since we are running spark locally.
The executor has 2 cores so it can process two partitions at once
In [3]:
Copied!
emp_data = [
["001","101","John Doe","30","Male","50000","2015-01-01"],
["002","101","Jane Smith","25","Female","45000","2016-02-15"],
["003","102","Bob Brown","35","Male","55000","2014-05-01"],
["004","102","Alice Lee","28","Female","48000","2017-09-30"],
["005","103","Jack Chan","40","Male","60000","2013-04-01"],
["006","103","Jill Wong","32","Female","52000","2018-07-01"],
["007","101","James Johnson","42","Male","70000","2012-03-15"],
["008","102","Kate Kim","29","Female","51000","2019-10-01"],
["009","103","Tom Tan","33","Male","58000","2016-06-01"],
["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
["011","104","David Park","38","Male","65000","2015-11-01"],
["012","105","Susan Chen","31","Female","54000","2017-02-15"],
["013","106","Brian Kim","45","Male","75000","2011-07-01"],
["014","107","Emily Lee","26","Female","46000","2019-01-01"],
["015","106","Michael Lee","37","Male","63000","2014-09-30"],
["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
["017","105","George Wang","34","Male","57000","2016-03-15"],
["018","104","Nancy Liu","29","Female","50000","2017-06-01"],
["019","103","Steven Chen","36","Male","62000","2015-08-01"],
["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]
emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"
emp_data = [
["001","101","John Doe","30","Male","50000","2015-01-01"],
["002","101","Jane Smith","25","Female","45000","2016-02-15"],
["003","102","Bob Brown","35","Male","55000","2014-05-01"],
["004","102","Alice Lee","28","Female","48000","2017-09-30"],
["005","103","Jack Chan","40","Male","60000","2013-04-01"],
["006","103","Jill Wong","32","Female","52000","2018-07-01"],
["007","101","James Johnson","42","Male","70000","2012-03-15"],
["008","102","Kate Kim","29","Female","51000","2019-10-01"],
["009","103","Tom Tan","33","Male","58000","2016-06-01"],
["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
["011","104","David Park","38","Male","65000","2015-11-01"],
["012","105","Susan Chen","31","Female","54000","2017-02-15"],
["013","106","Brian Kim","45","Male","75000","2011-07-01"],
["014","107","Emily Lee","26","Female","46000","2019-01-01"],
["015","106","Michael Lee","37","Male","63000","2014-09-30"],
["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
["017","105","George Wang","34","Male","57000","2016-03-15"],
["018","104","Nancy Liu","29","Female","50000","2017-06-01"],
["019","103","Steven Chen","36","Male","62000","2015-08-01"],
["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]
emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"
In [4]:
Copied!
emp = spark.createDataFrame(data=emp_data, schema=emp_schema)
emp.rdd.getNumPartitions()
emp = spark.createDataFrame(data=emp_data, schema=emp_schema)
emp.rdd.getNumPartitions()
Out[4]:
2
In [5]:
Copied!
emp.write.format("parquet").mode("overwrite").save("data/output/11/2/emp.parquet")
emp.write.format("parquet").mode("overwrite").save("data/output/11/2/emp.parquet")
The two cores that we have are utilized in writing data, each writes 10 records.
There is a folder called emp.parquet that has two snappy files that correspond to data from two partitions/ cores
In [6]:
Copied!
emp.write.format("csv").mode("error").option("header", True).save("data/output/11/3/emp.csv")
emp.write.format("csv").mode("error").option("header", True).save("data/output/11/3/emp.csv")
--------------------------------------------------------------------------- AnalysisException Traceback (most recent call last) Cell In[6], line 1 ----> 1 emp.write.format("csv").mode("error").option("header", True).save("data/output/11/3/emp.csv") File /usr/local/lib/python3.12/dist-packages/pyspark/sql/readwriter.py:1463, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options) 1461 self._jwrite.save() 1462 else: -> 1463 self._jwrite.save(path) File ~/spark-3.5.4-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File /usr/local/lib/python3.12/dist-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw) 181 converted = convert_exception(e.java_exception) 182 if not isinstance(converted, UnknownException): 183 # Hide where the exception came from that shows a non-Pythonic 184 # JVM exception message. --> 185 raise converted from None 186 else: 187 raise AnalysisException: [PATH_ALREADY_EXISTS] Path file:/home/ubuntu/SparkLearning/data/output/11/3/emp.csv already exists. Set mode as "overwrite" to overwrite the existing path.
In [8]:
Copied!
# Write the data with Partition to output location
emp.write.format("csv").partitionBy("department_id").mode("overwrite").option("header", True).save("data/output/11/5/emp.csv")
# Write the data with Partition to output location
emp.write.format("csv").partitionBy("department_id").mode("overwrite").option("header", True).save("data/output/11/5/emp.csv")
In [9]:
Copied!
# Bonus TIP
# What if we need to write only 1 output file to share with DownStream?
emp.repartition(1).write.format("csv").mode("overwrite").option("header", True).save("data/output/11/6/emp.csv")
# Bonus TIP
# What if we need to write only 1 output file to share with DownStream?
emp.repartition(1).write.format("csv").mode("overwrite").option("header", True).save("data/output/11/6/emp.csv")
In [ ]:
Copied!