Auto Loader Rescue
In [0]:
Copied!
%python
# DBFS paths (change as needed)
input_path = "dbfs:/tmp/autoloader_demo/input"
schema_loc = "dbfs:/tmp/autoloader_demo/_schemas" # Auto Loader schema metadata
checkpoint = "dbfs:/tmp/autoloader_demo/_checkpoint"
delta_target = "dbfs:/tmp/autoloader_demo/bronze_delta"
%python
# DBFS paths (change as needed)
input_path = "dbfs:/tmp/autoloader_demo/input"
schema_loc = "dbfs:/tmp/autoloader_demo/_schemas" # Auto Loader schema metadata
checkpoint = "dbfs:/tmp/autoloader_demo/_checkpoint"
delta_target = "dbfs:/tmp/autoloader_demo/bronze_delta"
In [0]:
Copied!
%python
# (Databricks) create directory & write example JSON files
dbutils.fs.rm(input_path, recurse=True) # cleanup for demo (optional)
dbutils.fs.mkdirs(input_path)
# 1) original file (matches expected schema)
dbutils.fs.put(f"{input_path}/file1.json",
'{"id": 1, "name": "Alice", "age": 30}\n', True)
# 2) new column appears later (email) -> should be rescued into _rescued_data
dbutils.fs.put(f"{input_path}/file2.json",
'{"id": 2, "name": "Bob", "age": 25, "email": "bob@example.com"}\n', True)
# 3) type mismatch (age as non-integer and id as string) -> rescued
dbutils.fs.put(f"{input_path}/file3.json",
'{"id": "3", "name": "Charlie", "age": "not_a_number"}\n', True)
# 4) case-mismatch field names -> rescued if case-sensitivity triggers it
dbutils.fs.put(f"{input_path}/file4.json",
'{"Id": 4, "Name": "Dave", "AGE": 40}\n', True)
%python
# (Databricks) create directory & write example JSON files
dbutils.fs.rm(input_path, recurse=True) # cleanup for demo (optional)
dbutils.fs.mkdirs(input_path)
# 1) original file (matches expected schema)
dbutils.fs.put(f"{input_path}/file1.json",
'{"id": 1, "name": "Alice", "age": 30}\n', True)
# 2) new column appears later (email) -> should be rescued into _rescued_data
dbutils.fs.put(f"{input_path}/file2.json",
'{"id": 2, "name": "Bob", "age": 25, "email": "bob@example.com"}\n', True)
# 3) type mismatch (age as non-integer and id as string) -> rescued
dbutils.fs.put(f"{input_path}/file3.json",
'{"id": "3", "name": "Charlie", "age": "not_a_number"}\n', True)
# 4) case-mismatch field names -> rescued if case-sensitivity triggers it
dbutils.fs.put(f"{input_path}/file4.json",
'{"Id": 4, "Name": "Dave", "AGE": 40}\n', True)
Wrote 38 bytes. Wrote 64 bytes. Wrote 54 bytes. Wrote 37 bytes.
Out[0]:
True
In [0]:
Copied!
%python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, get_json_object
expected_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
# Optional: disable adding filePath into _rescued_data if you don't want it:
# spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
df = (spark.readStream
.format("cloudFiles")
.schema(expected_schema) # you know the base schema
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", schema_loc) # required for autoloader state
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "rescue") # << rescue mode
.option("cloudFiles.rescuedDataColumn", "_rescued_data")# name the rescue column
.load(input_path)
)
# write to a delta path (bronze)
query = (df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint)
.trigger(availableNow=True)
.outputMode("append")
.start(delta_target))
# show streaming query status (optional)
spark.streams.active
%python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, get_json_object
expected_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
# Optional: disable adding filePath into _rescued_data if you don't want it:
# spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
df = (spark.readStream
.format("cloudFiles")
.schema(expected_schema) # you know the base schema
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", schema_loc) # required for autoloader state
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "rescue") # << rescue mode
.option("cloudFiles.rescuedDataColumn", "_rescued_data")# name the rescue column
.load(input_path)
)
# write to a delta path (bronze)
query = (df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint)
.trigger(availableNow=True)
.outputMode("append")
.start(delta_target))
# show streaming query status (optional)
spark.streams.active
Out[0]:
[<pyspark.sql.connect.streaming.query.StreamingQuery at 0xffff08846330>]
In [0]:
Copied!
%python
bronze = spark.read.format("delta").load(delta_target)
bronze.printSchema()
bronze.show(truncate=False)
%python
bronze = spark.read.format("delta").load(delta_target)
bronze.printSchema()
bronze.show(truncate=False)
root |-- id: integer (nullable = true) |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- _rescued_data: string (nullable = true) +----+-------+----+-----------------------------------------------------------------------------------------+ |id |name |age |_rescued_data | +----+-------+----+-----------------------------------------------------------------------------------------+ |2 |Bob |25 |{"email":"bob@example.com","_file_path":"dbfs:/tmp/autoloader_demo/input/file2.json"} | |NULL|Charlie|NULL|{"age":"not_a_number","id":"3","_file_path":"dbfs:/tmp/autoloader_demo/input/file3.json"}| |NULL|NULL |NULL|{"Id":4,"Name":"Dave","AGE":40,"_file_path":"dbfs:/tmp/autoloader_demo/input/file4.json"}| |1 |Alice |30 |NULL | +----+-------+----+-----------------------------------------------------------------------------------------+
In [0]:
Copied!
%python
# 1) Show the raw rescued column
display(
bronze.select(
"id",
"name",
"age",
"_rescued_data"
)
)
# 2) If _rescued_data is a JSON string: extract 'email' (returns NULL if absent)
display(
bronze.select(
"id",
"name",
"age",
get_json_object(
col("_rescued_data"),
"$.email"
).alias("email_from_rescue")
)
)
%python
# 1) Show the raw rescued column
display(
bronze.select(
"id",
"name",
"age",
"_rescued_data"
)
)
# 2) If _rescued_data is a JSON string: extract 'email' (returns NULL if absent)
display(
bronze.select(
"id",
"name",
"age",
get_json_object(
col("_rescued_data"),
"$.email"
).alias("email_from_rescue")
)
)
id | name | age | _rescued_data |
---|---|---|---|
2 | Bob | 25 | {"email":"bob@example.com","_file_path":"dbfs:/tmp/autoloader_demo/input/file2.json"} |
null | Charlie | null | {"age":"not_a_number","id":"3","_file_path":"dbfs:/tmp/autoloader_demo/input/file3.json"} |
null | null | null | {"Id":4,"Name":"Dave","AGE":40,"_file_path":"dbfs:/tmp/autoloader_demo/input/file4.json"} |
1 | Alice | 30 | null |
id | name | age | email_from_rescue |
---|---|---|---|
2 | Bob | 25 | bob@example.com |
null | Charlie | null | null |
null | null | null | null |
1 | Alice | 30 | null |