In [1]:
Copied!
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Reading Complex Data Formats")
.master("local[*]")
.getOrCreate()
)
spark
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Reading Complex Data Formats")
.master("local[*]")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/15 16:23:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[1]:
SparkSession - in-memory
In [9]:
Copied!
df_parquet = spark.read.format("parquet").load("data/sales_data.parquet")
df_parquet = spark.read.format("parquet").load("data/sales_data.parquet")
In [10]:
Copied!
df_parquet.printSchema()
df_parquet.printSchema()
root |-- transacted_at: timestamp (nullable = true) |-- trx_id: integer (nullable = true) |-- retailer_id: integer (nullable = true) |-- description: string (nullable = true) |-- amount: double (nullable = true) |-- city_id: integer (nullable = true)
Parquet stores the metadata along with file
In [11]:
Copied!
df_parquet.show(truncate=False)
df_parquet.show(truncate=False)
+-------------------+----------+-----------+-----------------------------------------------+-------+----------+ |transacted_at |trx_id |retailer_id|description |amount |city_id | +-------------------+----------+-----------+-----------------------------------------------+-------+----------+ |2017-11-24 19:00:00|1995601912|2077350195 |Walgreen 11-25 |197.23 |216510442 | |2017-11-24 19:00:00|1734117021|644879053 |unkn ppd id: 768641 11-26 |8.58 |930259917 | |2017-11-24 19:00:00|1734117022|847200066 |Wal-Mart ppd id: 555914 Algiers 11-26 |1737.26|1646415505| |2017-11-24 19:00:00|1734117030|1953761884 |Home Depot ppd id: 265293 11-25 |384.5 |287177635 | |2017-11-24 19:00:00|1734117089|1898522855 |Target 11-25 |66.33 |1855530529| |2017-11-24 19:00:00|1734117117|997626433 |Sears ppd id: 856095 Ashgabat |298.87 |957346984 | |2017-11-24 19:00:00|1734117123|1953761884 |unkn ppd id: 153174 Little Rock 11-25 |19.55 |45522086 | |2017-11-24 19:00:00|1734117152|1429095612 |Ikea arc id: 527956 Saint John's 11-26 |9.39 |1268541279| |2017-11-24 19:00:00|1734117153|847200066 |unkn Kingstown |2907.57|1483931123| |2017-11-24 19:00:00|1734117212|1996661856 |unkn ppd id: 454437 11-24 |140.38 |336763936 | |2017-11-24 19:00:00|1734117241|486576507 |iTunes |2912.67|1663872965| |2017-11-24 19:00:00|2076947148|847200066 |Wal-Mart 11-24 |62.83 |1556600840| |2017-11-24 19:00:00|2076947147|562903918 |McDonald's ccd id: 135878 Ljubljana 11-24|31.37 |930259917 | |2017-11-24 19:00:00|2076947146|511877722 |unkn ccd id: 598521 Ankara 11-26 |1915.35|1698762556| |2017-11-24 19:00:00|2076947113|1996661856 |AutoZone arc id: 998454 11-25 |1523.6 |1759612211| |2017-11-24 19:00:00|2076947018|902350112 |DineEquity arc id: 1075293 |22.28 |2130657559| |2017-11-24 19:00:00|2076946994|1898522855 |Target ppd id: 336785 |2589.93|2074005445| |2017-11-24 19:00:00|2076946985|847200066 |Wal-Mart ppd id: 252763 11-26 |42.2 |459344513 | |2017-11-24 19:00:00|2076946960|386167994 |Wendy's ppd id: 881511 11-24 |14.62 |352952442 | |2017-11-24 19:00:00|2076946954|486576507 |iTunes ppd id: 121397 |37.42 |485114748 | +-------------------+----------+-----------+-----------------------------------------------+-------+----------+ only showing top 20 rows
In [13]:
Copied!
df_orc = spark.read.format("orc").load("data/sales_data.orc")
df_orc = spark.read.format("orc").load("data/sales_data.orc")
In [14]:
Copied!
# Benefits of Columnar Storage
# Lets create a simple Python decorator - {get_time} to get the execution timings
# If you dont know about Python decorators - check out : https://www.geeksforgeeks.org/decorators-in-python/
import time
def get_time(func):
def inner_get_time() -> str:
start_time = time.time()
func()
end_time = time.time()
return (f"Execution time: {(end_time - start_time)*1000} ms")
print(inner_get_time())
# Benefits of Columnar Storage
# Lets create a simple Python decorator - {get_time} to get the execution timings
# If you dont know about Python decorators - check out : https://www.geeksforgeeks.org/decorators-in-python/
import time
def get_time(func):
def inner_get_time() -> str:
start_time = time.time()
func()
end_time = time.time()
return (f"Execution time: {(end_time - start_time)*1000} ms")
print(inner_get_time())
In [16]:
Copied!
@get_time
def x():
df = spark.read.format("parquet").load("data/sales_data.parquet")
df.count()
@get_time
def x():
df = spark.read.format("parquet").load("data/sales_data.parquet")
df.count()
Execution time: 791.6536331176758 ms
In [18]:
Copied!
@get_time
def x():
df = spark.read.format("parquet").load("data/sales_data.parquet")
df.select("trx_id").count()
@get_time
def x():
df = spark.read.format("parquet").load("data/sales_data.parquet")
df.select("trx_id").count()
Execution time: 255.80644607543945 ms
Observe that the time to read when we give just one column is significantly less
In [ ]:
Copied!
df_1 = spark.read.format("parquet").option("recursiveFileLookup", True).load("data/input/sales_recursive/")
df_1.show()
df_1 = spark.read.format("parquet").option("recursiveFileLookup", True).load("data/input/sales_recursive/")
df_1.show()