Saltar a contenido

Code Examples

Delta Live Tables Code Walkthrough

  1. Create Streaming Table for Orders
@dlt.table(
  table_properties = {"quality":"bronze"},
  comment = "Orders Bronze Table"
)
def orders_bronze():
  df = spark.readStream.table("dev.bronze.orders_raw")
  return df
  1. Create Materialized View for Customers
@dlt.table(
  table_properties = {"quality":"bronze"},
  comment = "Customers Materialized View"
)
def customers_bronze():
  df = spark.read.table("dev.bronze.customers_raw")
  return df
  1. Create a view that joins above streaming table and materialized view
@dlt.view(
  comment = 'Joined View'
)
def joined_vw():
  df_c = spark.read.table("LIVE.customers_bronze")
  df_o = spark.read.table("LIVE.orders_bronze")

  df_join = df_o.join(df_c,how = "left_outer",on = df_c.c_custkey==df_o.o_custkey)

  return df_join  
  1. Add a new column to the view
@dlt.table(
  table_properties = {"quality":"silver"},
  comment = "joined table",
  name = 'joined_silver'
)
def joined_silver():
  df = spark.read.table("LIVE.joined_vw").withColumn("_insertdate",current_timestamp())
  return df
  1. Create gold level aggregation
@dlt.table(
  table_properties = {"quality":"gold"},
  comment = "orders aggregated table",
)
def joined_silver():
  df = spark.read.table("LIVE.joined_silver")

  df_final = df.groupBy('c_mktsegment').agg(count('o_orderkey').alias('sum_orders').withColumn('_insertdate',current_timestamp()))
  return df_final

image

Deleting DLT Pipeline

The tables / datasets in DLT are managed and linked to DLT pipelines. So if we delete a pipleine all fo them get dropped.

Incremental Load in DLT

When we inserted 10k records into orders_bronze, only those got ingested not the entire table.

image

Adding New Column

@dlt.table(
  table_properties = {"quality":"gold"},
  comment = "orders aggregated table",
)
def joined_silver():
  df = spark.read.table("LIVE.joined_silver")

  df_final = df.groupBy('c_mktsegment').agg(count('o_orderkey').alias('sum_orders').agg(sum('o_totalprice').alias('sum_price').withColumn('_insertdate',current_timestamp()))
  return df_final

We dont have to manipulate ddl, the dlt pipeline will auto detect addition of new column.

Renaming Tables

We just change the name of the function in the table declaration and the table name will be renamed. The catalog will also reflect this.

DLT Internals

Every streaming table, MV is supported by underlying tables in _databricks_internal schema.

image

and they have a table_id associated with it.

If we go to these tables in storage account, we can see checkpoints that keep track of incremental data changes.

image

Data Lineage

image

DLT Append Flow and Autoloader

@dlt.table(
  table_properties = {"quality":"bronze"},
  comment = "orders autoloader",
  name = "orders_autoloader_bronze"
)
def func():
  df = (
      spark.readStream
      .format("cloudFiles")
      .option("cloudFilesFormat","CSV")
      .option("cloudFiles.schemaLocation","...")
      .option("pathGlobFilter","*.csv")
      .option("cloudFiles.schemaEvolutionMode","none")
      .load("/Volumes/etl/landing/files"
)
return df
dlt.createStreamingTable("order_union_bronze")

@dlt.append_flow(
  target = "orders_union_bronze"
)
def order_delta_append():
  df = spark.readStream.table("LIVE.orders_bronze")
  return df

@dlt.append_flow(
  target = "orders_union_bronze"
)
def order_autoloader_append():
  df = spark.readStream.table("LIVE.orders_autoloader_bronze")
  return df
@dlt.view(
  comment = 'Joined View'
)
def joined_vw():
  df_c = spark.read.table("LIVE.customers_bronze")
  df_o = spark.read.table("LIVE.orders_union_bronze")

  df_join = df_o.join(df_c,how = "left_outer",on = df_c.c_custkey==df_o.o_custkey)

  return df_join  

image

Custom Configuration

image

Use this param in code

_order_status = spark.conf.get("custom.orderStatus","_NA")
for _status in _order_status.split(","):
    # create gold table
    @dlt.table(
        table_properties = {"quality":"gold"},
        comment = "order aggregated table",
        name = f"orders_agg_{_status}_gold"
    )
    def orders_aggregated_gold():
        df = spark.read.table("LIVE.joined_silver")
        df_final = df.where(f"o_orderstatus = '{_status}'").groupBy("c_mktsegment").agg(count('o_orderkey').alias("count_of_orders"),sum("o_totalprice").alias('sum_totalprice')).withColumn("_insert_date", current_timestamp())

        return df_final

image

DLT SCD1 and SCD2

Pre Requisites

image

Input Source Table

@dlt.view(
  comment = "Customer Bronze streaming view"
)
def customer_bronze():
  df = spark.readStream.table("dev.bronze.customers_raw")
  return df

SCD Type1 Table

dlt.create_streaming_table('customer_sdc1_bronze')

dlt.apply_changes(
  target = "customer_scd1_bronze",
  source = "customer_bronze_vw",
  keys = ['c_custkey'],
  stored_as_scd_type = 1,
  apply_as_deletes = expr("__src_action = 'D'"),
  apply_as_truncates = expr("__src_action = 'T'"),
  sequence_by = "__src_insert_dt"
)

SCD Type 2 Table

dlt.create_streaming_table('customer_sdc2_bronze')

dlt.apply_changes(
  target = "customer_scd1_bronze",
  source = "customer_bronze_vw",
  keys = ['c_custkey'],
  stored_as_scd_type = 2,
  except_column_list = ['__src_action','__src_insert_dt']
  sequence_by = "__src_insert_dt"
)

Changes in view to make SCD2 applicable

@dlt.view(
  comment = 'Joined View'
)
def joined_vw():
  df_c = spark.read.table("LIVE.customers_scd2_bronze").where("__END_AT is null")
  df_o = spark.read.table("LIVE.orders_union_bronze")

  df_join = df_o.join(df_c,how = "left_outer",on = df_c.c_custkey==df_o.o_custkey)

  return df_join  

image

After inserting record with update the __END_AT for the new update is null signifying its the latest update

image

In SCD Type1 just the update is captured.

image

Insert Old Timestamp record

image

SCD Type1 vs SCD Type2 Delete Records

image

image

Rules for Data Quality : Warn, Drop and Fail

Defining the Rules

__order_rules = {
  "Valid Order Status" : "o_order_status in ('O','F','P')",
  "Valid Order Price" : "o_orderprice > 0"
}

__customer_rules = {
  "valid market segment" : "c_mktsegment is not null"
}

Adding the rules

@dlt.table(
  table_properties = {"quality":"bronze"},
  comment = "Orders Bronze Table"
)
@dlt.expect_all(__order_rules) # warn
def orders_bronze():
  df = spark.readStream.table("dev.bronze.orders_raw")
  return df
@dlt.table(
  table_properties = {"quality":"bronze"},
  comment = "Customers Materialized View"
)
@dlt.expect_all(__customer_rules) # warn
def customers_bronze():
  df = spark.read.table("dev.bronze.customers_raw")
  return df

image

Edge Case

image

Number of failed records here is 2, but in source table only one record was flawed, but since there are two consumers it shows 2 records failed.

Using Expectations on the view

image

image

Even though on top we can see market segment is null, since we are doing a left join and the joined view does not have details for the customer 99999,(because it failed expectation and record was dropped), so there were no failed records at all.

image

Monitoring and Observability

Check this link