Delta Live Tables
In [0]:
Copied!
# DLT works with 3 types of datasets
# Streaming Tables (Permanent / Temporary) - Used as append data sources, Incremental data
# Materialized Views - Used for transformations, aggregation or computation
# Views - Used for intermediate transformations not stored in target schemas.
import dlt
# DLT works with 3 types of datasets
# Streaming Tables (Permanent / Temporary) - Used as append data sources, Incremental data
# Materialized Views - Used for transformations, aggregation or computation
# Views - Used for intermediate transformations not stored in target schemas.
import dlt
The Delta Live Tables (DLT) module is not supported on this cluster.
You should either create a new pipeline or use an existing pipeline to run DLT code.
--------------------------------------------------------------------------- DLTImportException Traceback (most recent call last) File <command-4525527394208293>, line 6 1 # DLT works with 3 types of datasets 2 # Streaming Tables (Permanent / Temporary) - Used as append data sources, Incremental data 3 # Materialized Views - Used for transformations, aggregation or computation 4 # Views - Used for intermediate transformations not stored in target schemas. ----> 6 import dlt File /databricks/python_shell/lib/dbruntime/autoreload/discoverability/autoreload_discoverability_hook.py:96, in AutoreloadDiscoverabilityHook._patched_import(self, name, *args, **kwargs) 90 if not self._should_hint and ( 91 (module := sys.modules.get(absolute_name)) is not None and 92 (fname := get_allowed_file_name_or_none(module)) is not None and 93 (mtime := os.stat(fname).st_mtime) > self.last_mtime_by_modname.get( 94 absolute_name, float("inf")) and not self._should_hint): 95 self._should_hint = True ---> 96 module = self._original_builtins_import(name, *args, **kwargs) 97 if (fname := fname or get_allowed_file_name_or_none(module)) is not None: 98 mtime = mtime or os.stat(fname).st_mtime File /databricks/python_shell/lib/dbruntime/PostImportHook.py:239, in make_patched_exec_module.<locals>.patched_exec_module(module) 236 @functools.wraps(exec_module) 237 def patched_exec_module(module: ModuleType) -> None: 238 try: --> 239 exec_module(module) 240 notify_module_loaded(module) 241 except (ImportError, AttributeError): File /databricks/spark/python/dlt/__init__.py:24 21 # DLT python module is not supported when Spark Connect is enabled. So, we throw an error 22 # here. 23 if is_remote_spark: ---> 24 raise DLTImportException( 25 "Delta Live Tables module is not supported on Spark Connect clusters. " 26 ) 28 from dlt.api import * 29 from dlt.usage_warnings import deprecated_function DLTImportException: Delta Live Tables module is not supported on Spark Connect clusters.
In [0]:
Copied!
_order_status = spark.conf.get("custom.orderStatus","NA")
_order_status = spark.conf.get("custom.orderStatus","NA")
In [0]:
Copied!
# rules for data quality (drop,fail,warn)
__order_rules = {
"Valid Order Status" : "o_orderstatus in ('O','F','P')",
"Valid Order Price" : "o_totalprice > 0"
}
__customer_rules = {
"Valid Market Segment" : "c_mktsegment is not null"
}
# rules for data quality (drop,fail,warn)
__order_rules = {
"Valid Order Status" : "o_orderstatus in ('O','F','P')",
"Valid Order Price" : "o_totalprice > 0"
}
__customer_rules = {
"Valid Market Segment" : "c_mktsegment is not null"
}
In [0]:
Copied!
# Create Streaming Table
@dlt.table(
table_properties = {"quality":"bronze"},
comment = 'order bronze table'
)
def orders_bronze():
df = spark.readStream.table("dev.bronze.orders_raw")
return df
# Create Streaming Table
@dlt.table(
table_properties = {"quality":"bronze"},
comment = 'order bronze table'
)
def orders_bronze():
df = spark.readStream.table("dev.bronze.orders_raw")
return df
In [0]:
Copied!
# Create Orders Autoloader table
@dlt.table(
table_properties = {"quality":"bronze","pipeline.reset.allowed":"false"}, # pipeline reset false makes sure this table is not refreshed during full refresh
comment = 'order bronze table',
name = 'orders_autoloader_bronze'
)
def func():
df = spark.readStream.format('cloudFiles').option('cloudFiles.schemaHints','o_orderkey long,o_custkey long,o_orderstatus string,o_totalprice decimal(18,2),o_orderdate date,o_orderpriority string,o_clerk string,o_shippriority integer,o_comment string').option('cloudFiles.schemaLocation','/Volumes/dev/etl/landing/autoloader/schemas/1').option('cloudFiles.format','CSV').option('pathGlobFilter','*.csv').option('cloudFiles.schemaEvolutionMode','none').load('/Volumes/dev/etl/landing/files')
return df
# Create Orders Autoloader table
@dlt.table(
table_properties = {"quality":"bronze","pipeline.reset.allowed":"false"}, # pipeline reset false makes sure this table is not refreshed during full refresh
comment = 'order bronze table',
name = 'orders_autoloader_bronze'
)
def func():
df = spark.readStream.format('cloudFiles').option('cloudFiles.schemaHints','o_orderkey long,o_custkey long,o_orderstatus string,o_totalprice decimal(18,2),o_orderdate date,o_orderpriority string,o_clerk string,o_shippriority integer,o_comment string').option('cloudFiles.schemaLocation','/Volumes/dev/etl/landing/autoloader/schemas/1').option('cloudFiles.format','CSV').option('pathGlobFilter','*.csv').option('cloudFiles.schemaEvolutionMode','none').load('/Volumes/dev/etl/landing/files')
return df
In [0]:
Copied!
# Union in two or more streaming tables
dlt.create_streaming_table("orders_union_bronze")
# Append Flow table 1
@dlt.append_flow(
target = "orders_union_bronze"
)
def orders_delta_append():
df = spark.readStream.table("LIVE.orders_bronze")
return df
# Append Flow table 2
@dlt.append_flow(
target = "orders_union_bronze"
)
def orders_autoloader_append():
df = spark.readStream.table("LIVE.orders_autoloader_bronze")
return df
# Union in two or more streaming tables
dlt.create_streaming_table("orders_union_bronze")
# Append Flow table 1
@dlt.append_flow(
target = "orders_union_bronze"
)
def orders_delta_append():
df = spark.readStream.table("LIVE.orders_bronze")
return df
# Append Flow table 2
@dlt.append_flow(
target = "orders_union_bronze"
)
def orders_autoloader_append():
df = spark.readStream.table("LIVE.orders_autoloader_bronze")
return df
In [0]:
Copied!
# # Create materialized view for customer
# @dlt.table(
# comment = 'customer bronze table'
# )
# def customer_bronze():
# df = spark.read.table("dev.bronze.customers_raw")
# return df
# # Create materialized view for customer
# @dlt.table(
# comment = 'customer bronze table'
# )
# def customer_bronze():
# df = spark.read.table("dev.bronze.customers_raw")
# return df
In [0]:
Copied!
# Create materialized view for customer
@dlt.view(
comment = 'customer bronze view'
)
def customer_bronze_vw():
df = spark.readStream.table("dev.bronze.customers_raw")
return df
# Create materialized view for customer
@dlt.view(
comment = 'customer bronze view'
)
def customer_bronze_vw():
df = spark.readStream.table("dev.bronze.customers_raw")
return df
In [0]:
Copied!
from pyspark.sql.functions import expr
dlt.create_streaming_table("customer_scd1_bronze")
dlt.apply_changes(
target = "customer_scd1_bronze",
source = "customer_bronze_vw",
keys = ['c_custkey'],
apply_as_deletes = expr("_src_action = 'D'"),
apply_as_truncates=expr("_src_action = 'T'"),
except_column_list=['_src_action','_src_insert_dt'],
sequence_by = "_src_insert_dt"
)
from pyspark.sql.functions import expr
dlt.create_streaming_table("customer_scd1_bronze")
dlt.apply_changes(
target = "customer_scd1_bronze",
source = "customer_bronze_vw",
keys = ['c_custkey'],
apply_as_deletes = expr("_src_action = 'D'"),
apply_as_truncates=expr("_src_action = 'T'"),
except_column_list=['_src_action','_src_insert_dt'],
sequence_by = "_src_insert_dt"
)
In [0]:
Copied!
from pyspark.sql.functions import expr
dlt.create_streaming_table("customer_scd2_bronze")
dlt.apply_changes(
target = "customer_scd2_bronze",
source = "customer_bronze_vw",
keys = ['c_custkey'],
sequence_by = "_src_insert_dt",
stored_as_scd_type=2,
except_column_list=['_src_action','_src_insert_dt']
)
from pyspark.sql.functions import expr
dlt.create_streaming_table("customer_scd2_bronze")
dlt.apply_changes(
target = "customer_scd2_bronze",
source = "customer_bronze_vw",
keys = ['c_custkey'],
sequence_by = "_src_insert_dt",
stored_as_scd_type=2,
except_column_list=['_src_action','_src_insert_dt']
)
In [0]:
Copied!
# Create a view to join orders with customers
@dlt.view(
comment = 'Joined View'
)
@dlt.expect_all(__order_rules)
@dlt.expect_all(__customer_rules)
def joined_vw():
df_c = spark.read.table("LIVE.customer_scd2_bronze").where("__END_AT is null")
df_o = spark.read.table("LIVE.orders_union_bronze")
return df_o.join(df_c,how = 'left_outer',on = df_c.c_custkey == df_o.o_custkey)
# Create a view to join orders with customers
@dlt.view(
comment = 'Joined View'
)
@dlt.expect_all(__order_rules)
@dlt.expect_all(__customer_rules)
def joined_vw():
df_c = spark.read.table("LIVE.customer_scd2_bronze").where("__END_AT is null")
df_o = spark.read.table("LIVE.orders_union_bronze")
return df_o.join(df_c,how = 'left_outer',on = df_c.c_custkey == df_o.o_custkey)
In [0]:
Copied!
# Create MV to add new column
from pyspark.sql.functions import current_timestamp,count,sum
@dlt.table(
table_properties = {"quality":"silver"},
comment = "Joined_Table",
name = "joined_silver"
)
def joined_silver():
df = spark.read.table("LIVE.joined_vw").withColumn("_insert_date", current_timestamp())
return df
# Create MV to add new column
from pyspark.sql.functions import current_timestamp,count,sum
@dlt.table(
table_properties = {"quality":"silver"},
comment = "Joined_Table",
name = "joined_silver"
)
def joined_silver():
df = spark.read.table("LIVE.joined_vw").withColumn("_insert_date", current_timestamp())
return df
In [0]:
Copied!
@dlt.table(
table_properties = {"quality":"gold"},
comment = "order aggregated table",
name = f"orders_agg_gold"
)
def orders_aggregated_gold():
df = spark.read.table("LIVE.joined_silver")
df_final = df.groupBy("c_mktsegment").agg(count('o_orderkey').alias("count_of_orders"),sum("o_totalprice").alias('sum_totalprice')).withColumn("_insert_date", current_timestamp())
return df_final
@dlt.table(
table_properties = {"quality":"gold"},
comment = "order aggregated table",
name = f"orders_agg_gold"
)
def orders_aggregated_gold():
df = spark.read.table("LIVE.joined_silver")
df_final = df.groupBy("c_mktsegment").agg(count('o_orderkey').alias("count_of_orders"),sum("o_totalprice").alias('sum_totalprice')).withColumn("_insert_date", current_timestamp())
return df_final
In [0]:
Copied!
for _status in _order_status.split(","):
# create gold table
@dlt.table(
table_properties = {"quality":"gold"},
comment = "order aggregated table",
name = f"orders_agg_{_status}_gold"
)
def orders_aggregated_gold():
df = spark.read.table("LIVE.joined_silver")
df_final = df.where(f"o_orderstatus = '{_status}'").groupBy("c_mktsegment").agg(count('o_orderkey').alias("count_of_orders"),sum("o_totalprice").alias('sum_totalprice')).withColumn("_insert_date", current_timestamp())
return df_final
for _status in _order_status.split(","):
# create gold table
@dlt.table(
table_properties = {"quality":"gold"},
comment = "order aggregated table",
name = f"orders_agg_{_status}_gold"
)
def orders_aggregated_gold():
df = spark.read.table("LIVE.joined_silver")
df_final = df.where(f"o_orderstatus = '{_status}'").groupBy("c_mktsegment").agg(count('o_orderkey').alias("count_of_orders"),sum("o_totalprice").alias('sum_totalprice')).withColumn("_insert_date", current_timestamp())
return df_final