Trabajo con JSON
In [2]:
Copied!
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Reading and Parsing JSON Files/Data")
.master("local[*]")
.getOrCreate()
)
spark
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Reading and Parsing JSON Files/Data")
.master("local[*]")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/15 16:55:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 25/02/15 16:55:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Out[2]:
SparkSession - in-memory
In [3]:
Copied!
# Read Single line JSON file
df_single = spark.read.format("json").load("data/order_singleline.json")
# Read Single line JSON file
df_single = spark.read.format("json").load("data/order_singleline.json")
In [4]:
Copied!
df_single.printSchema()
df_single.printSchema()
root |-- contact: array (nullable = true) | |-- element: long (containsNull = true) |-- customer_id: string (nullable = true) |-- order_id: string (nullable = true) |-- order_line_items: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- amount: double (nullable = true) | | |-- item_id: string (nullable = true) | | |-- qty: long (nullable = true)
In [5]:
Copied!
df_single.show(truncate = False)
df_single.show(truncate = False)
+------------------------+-----------+--------+------------------------------------+ |contact |customer_id|order_id|order_line_items | +------------------------+-----------+--------+------------------------------------+ |[9000010000, 9000010001]|C001 |O101 |[{102.45, I001, 6}, {2.01, I003, 2}]| +------------------------+-----------+--------+------------------------------------+
In [10]:
Copied!
# Read Multiline JSON file
df_multi = spark.read.format("json").option("multiLine", True).load("data/order_multiline.json")
# Read Multiline JSON file
df_multi = spark.read.format("json").option("multiLine", True).load("data/order_multiline.json")
In [11]:
Copied!
df_multi.printSchema()
df_multi.printSchema()
root |-- contact: array (nullable = true) | |-- element: long (containsNull = true) |-- customer_id: string (nullable = true) |-- order_id: string (nullable = true) |-- order_line_items: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- amount: double (nullable = true) | | |-- item_id: string (nullable = true) | | |-- qty: long (nullable = true)
In [13]:
Copied!
df_multi.show(truncate=False)
df_multi.show(truncate=False)
+------------------------+-----------+--------+------------------------------------+ |contact |customer_id|order_id|order_line_items | +------------------------+-----------+--------+------------------------------------+ |[9000010000, 9000010001]|C001 |O101 |[{102.45, I001, 6}, {2.01, I003, 2}]| +------------------------+-----------+--------+------------------------------------+
In [15]:
Copied!
# With Schema
_schema = "customer_id string, order_id string, contact array<long>"
df_schema = spark.read.format("json").schema(_schema).load("data/order_singleline.json")
# With Schema
_schema = "customer_id string, order_id string, contact array"
df_schema = spark.read.format("json").schema(_schema).load("data/order_singleline.json")
In [16]:
Copied!
df_schema.show()
df_schema.show()
+-----------+--------+--------------------+ |customer_id|order_id| contact| +-----------+--------+--------------------+ | C001| O101|[9000010000, 9000...| +-----------+--------+--------------------+
Writing Complex Schema¶
In [17]:
Copied!
_schema = "contact array<string>,customer_id string,order_id string,order_line_items array<struct<amount double,item_id string,qty long>>"
_schema = "contact array,customer_id string,order_id string,order_line_items array>"
In [19]:
Copied!
df_schema_new = spark.read.format("json").schema(_schema).load("data/order_singleline.json")
df_schema_new = spark.read.format("json").schema(_schema).load("data/order_singleline.json")
In [20]:
Copied!
df_schema_new.printSchema()
df_schema_new.printSchema()
root |-- contact: array (nullable = true) | |-- element: string (containsNull = true) |-- customer_id: string (nullable = true) |-- order_id: string (nullable = true) |-- order_line_items: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- amount: double (nullable = true) | | |-- item_id: string (nullable = true) | | |-- qty: long (nullable = true)
In [22]:
Copied!
df = spark.read.format("text").load("data/order_singleline.json")
df = spark.read.format("text").load("data/order_singleline.json")
In [23]:
Copied!
df.printSchema()
df.printSchema()
root |-- value: string (nullable = true)
In [24]:
Copied!
df.show(truncate=False)
df.show(truncate=False)
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |value | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |{"order_id":"O101","customer_id":"C001","order_line_items":[{"item_id":"I001","qty":6,"amount":102.45},{"item_id":"I003","qty":2,"amount":2.01}],"contact":[9000010000,9000010001]}| +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
In [25]:
Copied!
_schema = "contact array<string>, customer_id string, order_id string, order_line_items array<struct<amount double, item_id string, qty long>>"
_schema = "contact array, customer_id string, order_id string, order_line_items array>"
In [26]:
Copied!
from pyspark.sql.functions import from_json
df_expanded = df.withColumn("parsed", from_json(df.value, _schema))
from pyspark.sql.functions import from_json
df_expanded = df.withColumn("parsed", from_json(df.value, _schema))
In [27]:
Copied!
df_expanded.show()
df_expanded.show()
+--------------------+--------------------+ | value| parsed| +--------------------+--------------------+ |{"order_id":"O101...|{[9000010000, 900...| +--------------------+--------------------+
In [28]:
Copied!
from pyspark.sql.functions import to_json
df_unparsed = df_expanded.withColumn("unparsed", to_json(df_expanded.parsed))
from pyspark.sql.functions import to_json
df_unparsed = df_expanded.withColumn("unparsed", to_json(df_expanded.parsed))
In [31]:
Copied!
df_unparsed.show()
df_unparsed.show()
+--------------------+--------------------+--------------------+ | value| parsed| unparsed| +--------------------+--------------------+--------------------+ |{"order_id":"O101...|{[9000010000, 900...|{"contact":["9000...| +--------------------+--------------------+--------------------+
In [33]:
Copied!
df_unparsed.select("parsed").show(truncate=False)
df_unparsed.select("parsed").show(truncate=False)
+----------------------------------------------------------------------------+ |parsed | +----------------------------------------------------------------------------+ |{[9000010000, 9000010001], C001, O101, [{102.45, I001, 6}, {2.01, I003, 2}]}| +----------------------------------------------------------------------------+
In [32]:
Copied!
df_unparsed.select("unparsed").show(truncate=False)
df_unparsed.select("unparsed").show(truncate=False)
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |unparsed | +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |{"contact":["9000010000","9000010001"],"customer_id":"C001","order_id":"O101","order_line_items":[{"amount":102.45,"item_id":"I001","qty":6},{"amount":2.01,"item_id":"I003","qty":2}]}| +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
In [34]:
Copied!
# Get values from Parsed JSON
df_1 = df_expanded.select("parsed.*")
# Get values from Parsed JSON
df_1 = df_expanded.select("parsed.*")
In [35]:
Copied!
df_1.show()
df_1.show()
+--------------------+-----------+--------+--------------------+ | contact|customer_id|order_id| order_line_items| +--------------------+-----------+--------+--------------------+ |[9000010000, 9000...| C001| O101|[{102.45, I001, 6...| +--------------------+-----------+--------+--------------------+
In [36]:
Copied!
df_1.printSchema()
df_1.printSchema()
root |-- contact: array (nullable = true) | |-- element: string (containsNull = true) |-- customer_id: string (nullable = true) |-- order_id: string (nullable = true) |-- order_line_items: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- amount: double (nullable = true) | | |-- item_id: string (nullable = true) | | |-- qty: long (nullable = true)
In [39]:
Copied!
from pyspark.sql.functions import explode
df_2 = df_1.withColumn("orders",explode("order_line_items")).withColumn("contact_details",explode("contact"))
from pyspark.sql.functions import explode
df_2 = df_1.withColumn("orders",explode("order_line_items")).withColumn("contact_details",explode("contact"))
In [40]:
Copied!
df_2.show()
df_2.show()
+--------------------+-----------+--------+--------------------+-----------------+---------------+ | contact|customer_id|order_id| order_line_items| orders|contact_details| +--------------------+-----------+--------+--------------------+-----------------+---------------+ |[9000010000, 9000...| C001| O101|[{102.45, I001, 6...|{102.45, I001, 6}| 9000010000| |[9000010000, 9000...| C001| O101|[{102.45, I001, 6...|{102.45, I001, 6}| 9000010001| |[9000010000, 9000...| C001| O101|[{102.45, I001, 6...| {2.01, I003, 2}| 9000010000| |[9000010000, 9000...| C001| O101|[{102.45, I001, 6...| {2.01, I003, 2}| 9000010001| +--------------------+-----------+--------+--------------------+-----------------+---------------+
In [43]:
Copied!
df_final = df_2.select("contact_details","customer_id","order_id","orders.*")
df_final = df_2.select("contact_details","customer_id","order_id","orders.*")
In [44]:
Copied!
df_final.show()
df_final.show()
+---------------+-----------+--------+------+-------+---+ |contact_details|customer_id|order_id|amount|item_id|qty| +---------------+-----------+--------+------+-------+---+ | 9000010000| C001| O101|102.45| I001| 6| | 9000010001| C001| O101|102.45| I001| 6| | 9000010000| C001| O101| 2.01| I003| 2| | 9000010001| C001| O101| 2.01| I003| 2| +---------------+-----------+--------+------+-------+---+
In [45]:
Copied!
spark.stop()
spark.stop()
In [ ]:
Copied!