Code Examples
Delta Live Tables Code Walkthrough¶
- Create Streaming Table for Orders
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "Orders Bronze Table"
)
def orders_bronze():
df = spark.readStream.table("dev.bronze.orders_raw")
return df
- Create Materialized View for Customers
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "Customers Materialized View"
)
def customers_bronze():
df = spark.read.table("dev.bronze.customers_raw")
return df
- Create a view that joins above streaming table and materialized view
@dlt.view(
comment = 'Joined View'
)
def joined_vw():
df_c = spark.read.table("LIVE.customers_bronze")
df_o = spark.read.table("LIVE.orders_bronze")
df_join = df_o.join(df_c,how = "left_outer",on = df_c.c_custkey==df_o.o_custkey)
return df_join
- Add a new column to the view
@dlt.table(
table_properties = {"quality":"silver"},
comment = "joined table",
name = 'joined_silver'
)
def joined_silver():
df = spark.read.table("LIVE.joined_vw").withColumn("_insertdate",current_timestamp())
return df
- Create gold level aggregation
@dlt.table(
table_properties = {"quality":"gold"},
comment = "orders aggregated table",
)
def joined_silver():
df = spark.read.table("LIVE.joined_silver")
df_final = df.groupBy('c_mktsegment').agg(count('o_orderkey').alias('sum_orders').withColumn('_insertdate',current_timestamp()))
return df_final
Deleting DLT Pipeline¶
The tables / datasets in DLT are managed and linked to DLT pipelines. So if we delete a pipleine all fo them get dropped.
Incremental Load in DLT¶
When we inserted 10k records into orders_bronze, only those got ingested not the entire table.
Adding New Column¶
@dlt.table(
table_properties = {"quality":"gold"},
comment = "orders aggregated table",
)
def joined_silver():
df = spark.read.table("LIVE.joined_silver")
df_final = df.groupBy('c_mktsegment').agg(count('o_orderkey').alias('sum_orders').agg(sum('o_totalprice').alias('sum_price').withColumn('_insertdate',current_timestamp()))
return df_final
We dont have to manipulate ddl, the dlt pipeline will auto detect addition of new column.
Renaming Tables¶
We just change the name of the function in the table declaration and the table name will be renamed. The catalog will also reflect this.
DLT Internals¶
Every streaming table, MV is supported by underlying tables in _databricks_internal
schema.
and they have a table_id associated with it.
If we go to these tables in storage account, we can see checkpoints that keep track of incremental data changes.
Data Lineage¶
DLT Append Flow and Autoloader¶
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "orders autoloader",
name = "orders_autoloader_bronze"
)
def func():
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFilesFormat","CSV")
.option("cloudFiles.schemaLocation","...")
.option("pathGlobFilter","*.csv")
.option("cloudFiles.schemaEvolutionMode","none")
.load("/Volumes/etl/landing/files"
)
return df
dlt.createStreamingTable("order_union_bronze")
@dlt.append_flow(
target = "orders_union_bronze"
)
def order_delta_append():
df = spark.readStream.table("LIVE.orders_bronze")
return df
@dlt.append_flow(
target = "orders_union_bronze"
)
def order_autoloader_append():
df = spark.readStream.table("LIVE.orders_autoloader_bronze")
return df
@dlt.view(
comment = 'Joined View'
)
def joined_vw():
df_c = spark.read.table("LIVE.customers_bronze")
df_o = spark.read.table("LIVE.orders_union_bronze")
df_join = df_o.join(df_c,how = "left_outer",on = df_c.c_custkey==df_o.o_custkey)
return df_join
Custom Configuration¶
Use this param in code
for _status in _order_status.split(","):
# create gold table
@dlt.table(
table_properties = {"quality":"gold"},
comment = "order aggregated table",
name = f"orders_agg_{_status}_gold"
)
def orders_aggregated_gold():
df = spark.read.table("LIVE.joined_silver")
df_final = df.where(f"o_orderstatus = '{_status}'").groupBy("c_mktsegment").agg(count('o_orderkey').alias("count_of_orders"),sum("o_totalprice").alias('sum_totalprice')).withColumn("_insert_date", current_timestamp())
return df_final
DLT SCD1 and SCD2¶
Pre Requisites
Input Source Table
@dlt.view(
comment = "Customer Bronze streaming view"
)
def customer_bronze():
df = spark.readStream.table("dev.bronze.customers_raw")
return df
SCD Type1 Table
dlt.create_streaming_table('customer_sdc1_bronze')
dlt.apply_changes(
target = "customer_scd1_bronze",
source = "customer_bronze_vw",
keys = ['c_custkey'],
stored_as_scd_type = 1,
apply_as_deletes = expr("__src_action = 'D'"),
apply_as_truncates = expr("__src_action = 'T'"),
sequence_by = "__src_insert_dt"
)
SCD Type 2 Table
dlt.create_streaming_table('customer_sdc2_bronze')
dlt.apply_changes(
target = "customer_scd1_bronze",
source = "customer_bronze_vw",
keys = ['c_custkey'],
stored_as_scd_type = 2,
except_column_list = ['__src_action','__src_insert_dt']
sequence_by = "__src_insert_dt"
)
Changes in view to make SCD2 applicable
@dlt.view(
comment = 'Joined View'
)
def joined_vw():
df_c = spark.read.table("LIVE.customers_scd2_bronze").where("__END_AT is null")
df_o = spark.read.table("LIVE.orders_union_bronze")
df_join = df_o.join(df_c,how = "left_outer",on = df_c.c_custkey==df_o.o_custkey)
return df_join
After inserting record with update the __END_AT
for the new update is null signifying its the latest update
In SCD Type1 just the update is captured.
Insert Old Timestamp record¶
SCD Type1 vs SCD Type2 Delete Records¶
Rules for Data Quality : Warn, Drop and Fail¶
Defining the Rules
__order_rules = {
"Valid Order Status" : "o_order_status in ('O','F','P')",
"Valid Order Price" : "o_orderprice > 0"
}
__customer_rules = {
"valid market segment" : "c_mktsegment is not null"
}
Adding the rules
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "Orders Bronze Table"
)
@dlt.expect_all(__order_rules) # warn
def orders_bronze():
df = spark.readStream.table("dev.bronze.orders_raw")
return df
@dlt.table(
table_properties = {"quality":"bronze"},
comment = "Customers Materialized View"
)
@dlt.expect_all(__customer_rules) # warn
def customers_bronze():
df = spark.read.table("dev.bronze.customers_raw")
return df
Edge Case¶
Number of failed records here is 2, but in source table only one record was flawed, but since there are two consumers it shows 2 records failed.
Using Expectations on the view¶
Even though on top we can see market segment is null, since we are doing a left join and the joined view does not have details for the customer 99999,(because it failed expectation and record was dropped), so there were no failed records at all.