Formatos de Datos
In [1]:
Copied!
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Reading from CSV Files")
.master("local[*]")
.getOrCreate()
)
spark
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Reading from CSV Files")
.master("local[*]")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/15 14:27:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[1]:
SparkSession - in-memory
In [5]:
Copied!
# Read a csv file into dataframe
df_1 = spark.read.format("csv").option("inferSchema", True).load("data/emp.csv")
# Read a csv file into dataframe
df_1 = spark.read.format("csv").option("inferSchema", True).load("data/emp.csv")
Observation : One Stage is created and it just reads one record to infer the schema from the data
In [6]:
Copied!
df_1.printSchema()
df_1.printSchema()
root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) |-- _c3: string (nullable = true) |-- _c4: string (nullable = true) |-- _c5: string (nullable = true) |-- _c6: string (nullable = true)
In [11]:
Copied!
df_3 = spark.read.format("csv").option("header",True).option("inferSchema", True).load("data/emp.csv")
df_3 = spark.read.format("csv").option("header",True).option("inferSchema", True).load("data/emp.csv")
In [12]:
Copied!
df_3.show()
df_3.show()
+-----------+-------------+-------------+---+------+------+----------+ |employee_id|department_id| name|age|gender|salary| hire_date| +-----------+-------------+-------------+---+------+------+----------+ | 1| 101| John Doe| 30| Male| 50000|2015-01-01| | 2| 101| Jane Smith| 25|Female| 45000|2016-02-15| | 3| 102| Bob Brown| 35| Male| 55000|2014-05-01| | 4| 102| Alice Lee| 28|Female| 48000|2017-09-30| | 5| 103| Jack Chan| 40| Male| 60000|2013-04-01| | 6| 103| Jill Wong| 32|Female| 52000|2018-07-01| | 7| 101|James Johnson| 42| Male| 70000|2012-03-15| | 8| 102| Kate Kim| 29|Female| 51000|2019-10-01| | 9| 103| Tom Tan| 33| Male| 58000|2016-06-01| | 10| 104| Lisa Lee| 27|Female| 47000|2018-08-01| | 11| 104| David Park| 38| Male| 65000|2015-11-01| | 12| 105| Susan Chen| 31|Female| 54000|2017-02-15| | 13| 106| Brian Kim| 45| Male| 75000|2011-07-01| | 14| 107| Emily Lee| 26|Female| 46000|2019-01-01| | 15| 106| Michael Lee| 37| Male| 63000|2014-09-30| | 16| 107| Kelly Zhang| 30|Female| 49000|2018-04-01| | 17| 105| George Wang| 34| Male| 57000|2016-03-15| | 18| 104| Nancy Liu| 29|Female| 50000|2017-06-01| | 19| 103| Steven Chen| 36| Male| 62000|2015-08-01| | 20| 102| Grace Kim| 32|Female| 53000|2018-11-01| +-----------+-------------+-------------+---+------+------+----------+
When we call show observe that there are total records - 1 read because one of the rows is the header
In [14]:
Copied!
df_3.printSchema()
df_3.printSchema()
root |-- employee_id: integer (nullable = true) |-- department_id: integer (nullable = true) |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- gender: string (nullable = true) |-- salary: integer (nullable = true) |-- hire_date: date (nullable = true)
So when we do header true and inferSchema = True then two jobs get created one reads justs single record to read the header and other one to infer the schema from a subset of records
In [16]:
Copied!
# Reading with Schema
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"
df_schema = spark.read.format("csv").option("header",True).schema(_schema).load("data/emp.csv")
# Reading with Schema
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"
df_schema = spark.read.format("csv").option("header",True).schema(_schema).load("data/emp.csv")
Now we see that no new job is created, because we are already providing the metadata spark doesnt have to do anything
In [17]:
Copied!
df_schema.show()
df_schema.show()
+-----------+-------------+-------------+---+------+-------+----------+ |employee_id|department_id| name|age|gender| salary| hire_date| +-----------+-------------+-------------+---+------+-------+----------+ | 1| 101| John Doe| 30| Male|50000.0|2015-01-01| | 2| 101| Jane Smith| 25|Female|45000.0|2016-02-15| | 3| 102| Bob Brown| 35| Male|55000.0|2014-05-01| | 4| 102| Alice Lee| 28|Female|48000.0|2017-09-30| | 5| 103| Jack Chan| 40| Male|60000.0|2013-04-01| | 6| 103| Jill Wong| 32|Female|52000.0|2018-07-01| | 7| 101|James Johnson| 42| Male|70000.0|2012-03-15| | 8| 102| Kate Kim| 29|Female|51000.0|2019-10-01| | 9| 103| Tom Tan| 33| Male|58000.0|2016-06-01| | 10| 104| Lisa Lee| 27|Female|47000.0|2018-08-01| | 11| 104| David Park| 38| Male|65000.0|2015-11-01| | 12| 105| Susan Chen| 31|Female|54000.0|2017-02-15| | 13| 106| Brian Kim| 45| Male|75000.0|2011-07-01| | 14| 107| Emily Lee| 26|Female|46000.0|2019-01-01| | 15| 106| Michael Lee| 37| Male|63000.0|2014-09-30| | 16| 107| Kelly Zhang| 30|Female|49000.0|2018-04-01| | 17| 105| George Wang| 34| Male|57000.0|2016-03-15| | 18| 104| Nancy Liu| 29|Female|50000.0|2017-06-01| | 19| 103| Steven Chen| 36| Male|62000.0|2015-08-01| | 20| 102| Grace Kim| 32|Female|53000.0|2018-11-01| +-----------+-------------+-------------+---+------+-------+----------+
Spark launched only one job here, since it already knows the schema it doesnt need to infer it
Read Modes in Spark¶
In [19]:
Copied!
# Handle BAD records - PERMISSIVE (Default mode)
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date, bad_record string"
df_p = spark.read.format("csv").schema(_schema).option("header", True).load("data/emp_new.csv")
# Handle BAD records - PERMISSIVE (Default mode)
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date, bad_record string"
df_p = spark.read.format("csv").schema(_schema).option("header", True).load("data/emp_new.csv")
In [20]:
Copied!
df_p.show()
df_p.show()
+-----------+-------------+-------------+---+------+-------+----------+----------+ |employee_id|department_id| name|age|gender| salary| hire_date|bad_record| +-----------+-------------+-------------+---+------+-------+----------+----------+ | 1| 101| John Doe| 30| Male|50000.0|2015-01-01| NULL| | 2| 101| Jane Smith| 25|Female|45000.0|2016-02-15| NULL| | 3| 102| Bob Brown| 35| Male|55000.0|2014-05-01| NULL| | 4| 102| Alice Lee| 28|Female|48000.0|2017-09-30| NULL| | 5| 103| Jack Chan| 40| Male|60000.0|2013-04-01| NULL| | 6| 103| Jill Wong| 32|Female|52000.0|2018-07-01| NULL| | 7| 101|James Johnson| 42| Male| NULL|2012-03-15| NULL| | 8| 102| Kate Kim| 29|Female|51000.0|2019-10-01| NULL| | 9| 103| Tom Tan| 33| Male|58000.0|2016-06-01| NULL| | 10| 104| Lisa Lee| 27|Female|47000.0|2018-08-01| NULL| | 11| 104| David Park| 38| Male|65000.0| NULL| NULL| | 12| 105| Susan Chen| 31|Female|54000.0|2017-02-15| NULL| | 13| 106| Brian Kim| 45| Male|75000.0|2011-07-01| NULL| | 14| 107| Emily Lee| 26|Female|46000.0|2019-01-01| NULL| | 15| 106| Michael Lee| 37| Male|63000.0|2014-09-30| NULL| | 16| 107| Kelly Zhang| 30|Female|49000.0|2018-04-01| NULL| | 17| 105| George Wang| 34| Male|57000.0|2016-03-15| NULL| | 18| 104| Nancy Liu| 29|Female|50000.0|2017-06-01| NULL| | 19| 103| Steven Chen| 36| Male|62000.0|2015-08-01| NULL| | 20| 102| Grace Kim| 32|Female|53000.0|2018-11-01| NULL| +-----------+-------------+-------------+---+------+-------+----------+----------+
25/02/15 14:24:26 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema: Header length: 7, schema size: 8 CSV file: file:///home/ubuntu/SparkLearning/data/emp_new.csv
In [3]:
Copied!
# Handle BAD records - PERMISSIVE (Default mode)
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date, bad_record string"
df_p_1 = spark.read.format("csv").schema(_schema).option("columnNameOfCorruptRecord", "bad_record").option("header", True).load("data/emp_new.csv")
# Handle BAD records - PERMISSIVE (Default mode)
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date, bad_record string"
df_p_1 = spark.read.format("csv").schema(_schema).option("columnNameOfCorruptRecord", "bad_record").option("header", True).load("data/emp_new.csv")
In [4]:
Copied!
df_p_1.show()
df_p_1.show()
+-----------+-------------+-------------+---+------+-------+----------+--------------------+ |employee_id|department_id| name|age|gender| salary| hire_date| bad_record| +-----------+-------------+-------------+---+------+-------+----------+--------------------+ | 1| 101| John Doe| 30| Male|50000.0|2015-01-01| NULL| | 2| 101| Jane Smith| 25|Female|45000.0|2016-02-15| NULL| | 3| 102| Bob Brown| 35| Male|55000.0|2014-05-01| NULL| | 4| 102| Alice Lee| 28|Female|48000.0|2017-09-30| NULL| | 5| 103| Jack Chan| 40| Male|60000.0|2013-04-01| NULL| | 6| 103| Jill Wong| 32|Female|52000.0|2018-07-01| NULL| | 7| 101|James Johnson| 42| Male| NULL|2012-03-15|007,101,James Joh...| | 8| 102| Kate Kim| 29|Female|51000.0|2019-10-01| NULL| | 9| 103| Tom Tan| 33| Male|58000.0|2016-06-01| NULL| | 10| 104| Lisa Lee| 27|Female|47000.0|2018-08-01| NULL| | 11| 104| David Park| 38| Male|65000.0| NULL|011,104,David Par...| | 12| 105| Susan Chen| 31|Female|54000.0|2017-02-15| NULL| | 13| 106| Brian Kim| 45| Male|75000.0|2011-07-01| NULL| | 14| 107| Emily Lee| 26|Female|46000.0|2019-01-01| NULL| | 15| 106| Michael Lee| 37| Male|63000.0|2014-09-30| NULL| | 16| 107| Kelly Zhang| 30|Female|49000.0|2018-04-01| NULL| | 17| 105| George Wang| 34| Male|57000.0|2016-03-15| NULL| | 18| 104| Nancy Liu| 29|Female|50000.0|2017-06-01| NULL| | 19| 103| Steven Chen| 36| Male|62000.0|2015-08-01| NULL| | 20| 102| Grace Kim| 32|Female|53000.0|2018-11-01| NULL| +-----------+-------------+-------------+---+------+-------+----------+--------------------+
In [7]:
Copied!
# Handle BAD records - DROPMALFORMED
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"
df_m = spark.read.format("csv").option("header", True).schema(_schema).load("data/emp_new.csv")
# Handle BAD records - DROPMALFORMED
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"
df_m = spark.read.format("csv").option("header", True).schema(_schema).load("data/emp_new.csv")
In [8]:
Copied!
df_m.show()
df_m.show()
+-----------+-------------+-------------+---+------+-------+----------+ |employee_id|department_id| name|age|gender| salary| hire_date| +-----------+-------------+-------------+---+------+-------+----------+ | 1| 101| John Doe| 30| Male|50000.0|2015-01-01| | 2| 101| Jane Smith| 25|Female|45000.0|2016-02-15| | 3| 102| Bob Brown| 35| Male|55000.0|2014-05-01| | 4| 102| Alice Lee| 28|Female|48000.0|2017-09-30| | 5| 103| Jack Chan| 40| Male|60000.0|2013-04-01| | 6| 103| Jill Wong| 32|Female|52000.0|2018-07-01| | 7| 101|James Johnson| 42| Male| NULL|2012-03-15| | 8| 102| Kate Kim| 29|Female|51000.0|2019-10-01| | 9| 103| Tom Tan| 33| Male|58000.0|2016-06-01| | 10| 104| Lisa Lee| 27|Female|47000.0|2018-08-01| | 11| 104| David Park| 38| Male|65000.0| NULL| | 12| 105| Susan Chen| 31|Female|54000.0|2017-02-15| | 13| 106| Brian Kim| 45| Male|75000.0|2011-07-01| | 14| 107| Emily Lee| 26|Female|46000.0|2019-01-01| | 15| 106| Michael Lee| 37| Male|63000.0|2014-09-30| | 16| 107| Kelly Zhang| 30|Female|49000.0|2018-04-01| | 17| 105| George Wang| 34| Male|57000.0|2016-03-15| | 18| 104| Nancy Liu| 29|Female|50000.0|2017-06-01| | 19| 103| Steven Chen| 36| Male|62000.0|2015-08-01| | 20| 102| Grace Kim| 32|Female|53000.0|2018-11-01| +-----------+-------------+-------------+---+------+-------+----------+
In [10]:
Copied!
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"
df_m = spark.read.format("csv").option("header", True).option("mode","DROPMALFORMED").schema(_schema).load("data/emp_new.csv")
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"
df_m = spark.read.format("csv").option("header", True).option("mode","DROPMALFORMED").schema(_schema).load("data/emp_new.csv")
In [11]:
Copied!
df_m.show()
df_m.show()
+-----------+-------------+-----------+---+------+-------+----------+ |employee_id|department_id| name|age|gender| salary| hire_date| +-----------+-------------+-----------+---+------+-------+----------+ | 1| 101| John Doe| 30| Male|50000.0|2015-01-01| | 2| 101| Jane Smith| 25|Female|45000.0|2016-02-15| | 3| 102| Bob Brown| 35| Male|55000.0|2014-05-01| | 4| 102| Alice Lee| 28|Female|48000.0|2017-09-30| | 5| 103| Jack Chan| 40| Male|60000.0|2013-04-01| | 6| 103| Jill Wong| 32|Female|52000.0|2018-07-01| | 8| 102| Kate Kim| 29|Female|51000.0|2019-10-01| | 9| 103| Tom Tan| 33| Male|58000.0|2016-06-01| | 10| 104| Lisa Lee| 27|Female|47000.0|2018-08-01| | 12| 105| Susan Chen| 31|Female|54000.0|2017-02-15| | 13| 106| Brian Kim| 45| Male|75000.0|2011-07-01| | 14| 107| Emily Lee| 26|Female|46000.0|2019-01-01| | 15| 106|Michael Lee| 37| Male|63000.0|2014-09-30| | 16| 107|Kelly Zhang| 30|Female|49000.0|2018-04-01| | 17| 105|George Wang| 34| Male|57000.0|2016-03-15| | 18| 104| Nancy Liu| 29|Female|50000.0|2017-06-01| | 19| 103|Steven Chen| 36| Male|62000.0|2015-08-01| | 20| 102| Grace Kim| 32|Female|53000.0|2018-11-01| +-----------+-------------+-----------+---+------+-------+----------+
In [13]:
Copied!
# Handle BAD records - FAILFAST
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"
df_f = spark.read.format("csv").option("header", True).option("mode", "FAILFAST").schema(_schema).load("data/emp_new.csv")
# Handle BAD records - FAILFAST
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"
df_f = spark.read.format("csv").option("header", True).option("mode", "FAILFAST").schema(_schema).load("data/emp_new.csv")
In [14]:
Copied!
df_f.show()
df_f.show()
25/02/15 14:32:07 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4) org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [7,101,James Johnson,42,Male,null,15414]. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1611) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:457) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "Low" at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:366) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:308) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:453) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60) ... 26 more Caused by: java.lang.NumberFormatException: For input string: "Low" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike.toDouble(StringLike.scala:327) at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:327) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$12(UnivocityParser.scala:208) at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:292) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$11(UnivocityParser.scala:204) at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:347) ... 29 more 25/02/15 14:32:07 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4) (ip-172-31-20-231.ec2.internal executor driver): org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [7,101,James Johnson,42,Male,null,15414]. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1611) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:457) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "Low" at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:366) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:308) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:453) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60) ... 26 more Caused by: java.lang.NumberFormatException: For input string: "Low" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike.toDouble(StringLike.scala:327) at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:327) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$12(UnivocityParser.scala:208) at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:292) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$11(UnivocityParser.scala:204) at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:347) ... 29 more 25/02/15 14:32:07 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) Cell In[14], line 1 ----> 1 df_f.show() File /usr/local/lib/python3.12/dist-packages/pyspark/sql/dataframe.py:947, in DataFrame.show(self, n, truncate, vertical) 887 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None: 888 """Prints the first ``n`` rows to the console. 889 890 .. versionadded:: 1.3.0 (...) 945 name | Bob 946 """ --> 947 print(self._show_string(n, truncate, vertical)) File /usr/local/lib/python3.12/dist-packages/pyspark/sql/dataframe.py:965, in DataFrame._show_string(self, n, truncate, vertical) 959 raise PySparkTypeError( 960 error_class="NOT_BOOL", 961 message_parameters={"arg_name": "vertical", "arg_type": type(vertical).__name__}, 962 ) 964 if isinstance(truncate, bool) and truncate: --> 965 return self._jdf.showString(n, 20, vertical) 966 else: 967 try: File ~/spark-3.5.4-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File /usr/local/lib/python3.12/dist-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw) 177 def deco(*a: Any, **kw: Any) -> Any: 178 try: --> 179 return f(*a, **kw) 180 except Py4JJavaError as e: 181 converted = convert_exception(e.java_exception) File ~/spark-3.5.4-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o120.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4) (ip-172-31-20-231.ec2.internal executor driver): org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [7,101,James Johnson,42,Male,null,15414]. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1611) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:457) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "Low" at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:366) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:308) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:453) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60) ... 26 more Caused by: java.lang.NumberFormatException: For input string: "Low" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike.toDouble(StringLike.scala:327) at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:327) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$12(UnivocityParser.scala:208) at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:292) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$11(UnivocityParser.scala:204) at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:347) ... 29 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321) at org.apache.spark.sql.Dataset.head(Dataset.scala:3316) at org.apache.spark.sql.Dataset.take(Dataset.scala:3539) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280) at org.apache.spark.sql.Dataset.showString(Dataset.scala:315) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [7,101,James Johnson,42,Male,null,15414]. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1611) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:457) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "Low" at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:366) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:308) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:453) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60) ... 26 more Caused by: java.lang.NumberFormatException: For input string: "Low" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike.toDouble(StringLike.scala:327) at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:327) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$12(UnivocityParser.scala:208) at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:292) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$11(UnivocityParser.scala:204) at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:347) ... 29 more
In [15]:
Copied!
# BONUS TIP
# Multiple options
_options = {
"header" : "true",
"inferSchema" : "true",
"mode" : "PERMISSIVE"
}
df = (spark.read.format("csv").options(**_options).load("data/emp.csv"))
# BONUS TIP
# Multiple options
_options = {
"header" : "true",
"inferSchema" : "true",
"mode" : "PERMISSIVE"
}
df = (spark.read.format("csv").options(**_options).load("data/emp.csv"))
In [16]:
Copied!
spark.stop()
spark.stop()
In [ ]:
Copied!