Draft

Appending Multiple Source Tables Together in a DLT Pipeline

playbook
Author

Austin Wolff

Published

July 8, 2025

Table of Contents

Append-Only

If you have two separate data sources, one streaming source and one static source, and need them appended together (a UNION for those familiar with SQL), the following will show you exactly how to accomplish this within a Databricks DLT Pipeline.

If you’d like to know how to apply CDC to the DLT Pipeline, I will also cover that below in the section titled Change Data Capture.

Difference Between Unioning and Appending Tables

We could just union the static source and streaming source together. However, the UNION operation will always require what’s called a “full refresh,” and will reprocess all of the data every time the pipeline is run (you can read more here). This creates unnecessary cost.

Appending a Static Table and a Streaming Table

Instead of performing a UNION, let’s now create the pipeline by appending the two sources. The key function here is dlt.create_streaming_table() to create the target table. Then we use the @dlt.append_flow decorator to append data from our source table into our target table.

First, I’ll show you what the underlying 2 tables look like:

# Table #1
# display(spark.sql("SELECT * FROM ee_dlt_union.bronze.simulate_static"))
transaction_id transaction_date product_name quantity amount
1001 2022-01-15 Old Laptop 1 750.00
1002 2022-03-20 Wired Mouse 2 15.00
1003 2022-06-10 Basic Keyboard 1 25.00
1004 2022-11-05 Old Monitor 1 150.00
1005 2023-02-12 Docking Station 1 120.00
# Table #2
# display(spark.sql("SELECT * FROM ee_dlt_union.bronze.simulate_stream"))
transaction_id transaction_date product_name quantity amount
1 2025-06-01 Product A 10 99.99
2 2025-06-02 Product B 5 49.50
3 2025-06-03 Product C 20 199.99

And now here is the pipeline to append these two sources together:

import dlt

# Create our target table to append data into
dlt.create_streaming_table("ee_dlt_union.bronze.appended_table")

# Append our static data
@dlt.append_flow(target="ee_dlt_union.bronze.appended_table")
def my_static_table():
  return spark.readStream.table("ee_dlt_union.bronze.simulate_static")

# Append our streaming data
@dlt.append_flow(target="ee_dlt_union.bronze.appended_table")
def my_streaming_table():
  return spark.readStream.table("ee_dlt_union.bronze.simulate_stream")

Here is the resulting table:

# Append-Only Table
# display(spark.sql("SELECT * FROM ee_dlt_union.bronze.appended_table"))
transaction_id transaction_date product_name quantity amount
1001 2022-01-15 Old Laptop 1 750.00
1002 2022-03-20 Wired Mouse 2 15.00
1003 2022-06-10 Basic Keyboard 1 25.00
1004 2022-11-05 Old Monitor 1 150.00
1005 2023-02-12 Docking Station 1 120.00
1 2025-06-01 Product A 10 99.99
2 2025-06-02 Product B 5 49.50
3 2025-06-03 Product C 20 199.99

Appending More Than 2 Sources

And here is another example that reads in CSV files from three different directories and appends them to the target table.

You’ll notice the example below reads files from a location with the cloudFiles format. This automatically allows you to use the Databricks Auto Loader, which can incrementally and efficiently process new data files as they arrive in cloud storage without any additional setup. In other words, with Auto Loader, as new data files land in your cloud storage, they are automatically processed through the pipeline. You can read more about Auto Loader here.

import dlt

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the 
# full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

As you can see, appending any number of sources together is easy with the dlt.create_streaming_table() function and the @dlt.append_flow decorator.

If you only care about ingesting new data from multiple sources, and do not care about ingesting any updates to the underlying source data, this setup is good enough.

However, if you do need to track if the underlying source data is changed or deleted, keep reading below on how to add Change Data Capture (CDC) to the pipeline.

Note

A Quick Note on Streaming Tables vs Materialized Views

You’ll notice above that each of the data sources are read using spark.readStream, creating Streaming Tables, not Materialized Views. It’s always preferred to create a Streaming Table when ingesting data.

As a rule of thumb, you should only create Materialized Views for static table joins (if you need to join your streaming data with static data to enrich it), or for your Gold Layer tables. Like all rules of thumbs, there are exceptions.

Change Data Capture

What if the Underlying Source Data is Updated or Deleted?

Our simple @dlt.append_flow decorator only INSERTS data. But what if we need to ingest CHANGES to the underlying data, such as UPDATES or DELETES?

For example, if a customer moved and now has a new home address, and you’d like to overwrite data to only keep the new address, this will require an UPDATE to your target table. We call this kind of data a slowly changing dimension (SCD), and there are two popular types of handling SCDs:

Type 1: Overwrite

Type 2: Add New Row

(Technically there are more than two types, but these are the most common.)

If we want to “capture” changes to our underlying data, whether they are Type 1 and Type 2 changes, we need to incorporate CDC (change data capture) into our pipeline.

CDC: Slowly Changing Dimensions (SCD) Type 1

Type 1 SCDs simply involve overwriting data (and you don’t care about the historical data). If we want to incorporate CDC for this kind of data, we’ll rely on the dlt.create_auto_cdc_flow() function.

We also need to make sure to utilize Delta Lake’s “change data feed” to track which rows are INSERTS, UPDATES, or DELETES. This is done by using the spark.readStream.format("delta").option("readChangeData", "true") command before selecting your table via .table("your_table"). See in the pipeline below for the example.

To demonstrate the pipeline, I’ll update a row.

# Data before the update
# display(spark.sql("SELECT * FROM ee_dlt_union.bronze.simulate_stream"))
transaction_id transaction_date product_name quantity amount
1 2025-06-01 Product A 10 99.99
2 2025-06-02 Product B 5 49.50
3 2025-06-03 Product C 20 199.99
# # Let's update a row
# spark.sql("""
#   UPDATE ee_dlt_union.bronze.simulate_stream
#   SET quantity = 107
#   WHERE transaction_id = 2;
#   """)

# # Data after the change
# display(spark.sql("""
#   SELECT * 
#   FROM ee_dlt_union.bronze.simulate_stream 
#   ORDER BY transaction_id"""))
transaction_id transaction_date product_name quantity amount
1 2025-06-01 Product A 10 99.99
2 2025-06-02 Product B 107 49.50
3 2025-06-03 Product C 20 199.99

To further demonstrate our pipeline, I’ll also delete a row. Let’s get rid of the row where transaction_id = 3.

# # Let's delete a row
# spark.sql("""
#   DELETE FROM ee_dlt_union.bronze.simulate_stream
#   WHERE transaction_id = 3;
#   """)

# # Data after the change
# display(spark.sql("""
#   SELECT * 
#   FROM ee_dlt_union.bronze.simulate_stream 
#   ORDER BY transaction_id"""))
transaction_id transaction_date product_name quantity amount
1 2025-06-01 Product A 10 99.99
2 2025-06-02 Product B 107 49.50

Great. We’ve updated the row where transaction_id = 2, and deleted where transaction_id = 3. Let’s run the pipeline and display the results below.

import dlt
from pyspark.sql.functions import col, expr

# Create our bronze table to append CDC data into
dlt.create_streaming_table("ee_dlt_union.bronze.appended_table_cdc")

# Create our silver table to insert cleaned CDC data into
dlt.create_streaming_table("ee_dlt_union.silver.target_table")

# Notice the difference below of how we read in 
# the same tables from before, but this time we
# include the change data feed columns

# Append our static data with its "change data feed" 
@dlt.append_flow(target="ee_dlt_union.bronze.appended_table_cdc")
def my_static_table():
  return (spark.readStream.format("delta") \
    .option("readChangeData", "true") \
    .table("simulate_static")
  )

# Append our streaming data with its "change data feed" 
@dlt.append_flow(target="ee_dlt_union.bronze.appended_table_cdc")
def my_streaming_table():
  return (spark.readStream.format("delta") \
    .option("readChangeData", "true") \
    .table("simulate_stream")
  )

# Now we will use this function to create
# our CDC flow
dlt.create_auto_cdc_flow(
  target = "ee_dlt_union.silver.target_table",  # Set our target table
  source = "ee_dlt_union.bronze.appended_table_cdc",  # We will look for data here
  keys = ["transaction_id"],  # What we'll use to match the rows to upsert
  sequence_by = "_commit_version",
  ignore_null_updates = False, 
  apply_as_deletes = expr("_change_type = 'delete'")
)

Now that the pipeline ran, I want to show you what the change data feed (CDF) looks like. The CDF was captured for both tables using the spark.readStream.format("delta").option("readChangeData", "true") command, then appended to ee_dlt_union.bronze.appended_table_cdc. Here’s what that table actually looks like:

# # Data after the change
# display(spark.sql("""
#   SELECT * 
#   FROM ee_dlt_union.bronze.appended_table_cdc
#   ORDER BY transaction_id
#   """))
transaction_id transaction_date product_name quantity amount _change_type _commit_version _commit_timestamp
1 2025-06-01 Product A 10 99.99 insert 38 2025-07-17T04:53:47Z
1001 2022-01-15 Old Laptop 1 750.00 insert 9 2025-07-17T04:53:46Z
1002 2022-03-20 Wired Mouse 2 15.00 insert 9 2025-07-17T04:53:46Z
1003 2022-06-10 Basic Keyboard 1 25.00 insert 9 2025-07-17T04:53:46Z
1004 2022-11-05 Old Monitor 1 150.00 insert 9 2025-07-17T04:53:46Z
1005 2023-02-12 Docking Station 1 120.00 insert 9 2025-07-17T04:53:46Z
2 2025-06-02 Product B 5 49.50 update_preimage 39 2025-07-17T04:55:51Z
2 2025-06-02 Product B 5 49.50 insert 38 2025-07-17T04:53:47Z
2 2025-06-02 Product B 107 49.50 update_postimage 39 2025-07-17T04:55:51Z
3 2025-06-03 Product C 20 199.99 insert 38 2025-07-17T04:53:47Z
3 2025-06-03 Product C 20 199.99 delete 41 2025-07-17T04:56:02Z

Notice the three additional columns, _change_type, _commit_version, and _commit_timestamp. The _change_type column will let us know the kind of change that has been captured (notice the update_preimage and update_postimage where transaction_id = 2). You’ll also notice I’ve been testing and tweaking this pipeline for awhile for this Entropy Exchange post, hence the high number of commits in the _commit_version column.

Now let’s query the target silver table to see if changes were applied:

# # Data after the change
# display(spark.sql("""
#   SELECT * 
#   FROM ee_dlt_union.silver.target_table
#   ORDER BY transaction_id
#   """))
transaction_id transaction_date product_name quantity amount _change_type _commit_version _commit_timestamp
1 2025-06-01 Product A 10 99.99 insert 38 2025-07-17T04:53:47Z
1001 2022-01-15 Old Laptop 1 750.00 insert 9 2025-07-17T04:53:46Z
1002 2022-03-20 Wired Mouse 2 15.00 insert 9 2025-07-17T04:53:46Z
1003 2022-06-10 Basic Keyboard 1 25.00 insert 9 2025-07-17T04:53:46Z
1004 2022-11-05 Old Monitor 1 150.00 insert 9 2025-07-17T04:53:46Z
1005 2023-02-12 Docking Station 1 120.00 insert 9 2025-07-17T04:53:46Z
2 2025-06-02 Product B 107 49.50 update_postimage 39 2025-07-17T04:55:51Z

Success! The row where transaction_id = 2 had its quantity updated, and the row where transaction = 3 was deleted.

Now if data is INSERTED, UPDATED, or DELETED from either the bronze “static” or “streaming” tables, those changes will be recorded in the bronze.appended_table_cdc table, and then applied to silver.target_table.

For example, if a record is deleted from the streaming source, we will still be able to see that record in the in the bronze.appended_table_cdc table, with its value in the column _change_type set to delete. But that record will then be deleted from silver.target_table.

CDC: Slowly Changing Dimensions (SCD) Type 2

If you would like to keep a historical record of any updated data, known as a Type 2 Slowly Changing Dimension (such as a customer’s current and previous addresses), all you have to do is adjust the dlt.create_auto_cdc_flow() function to include the argument stored_as_scd_type = 2. Take a look at the code below. It’s essentially identical to the code above, except for the new argument in the dlt.create_auto_cdc_flow() function.

import dlt
from pyspark.sql.functions import col, expr

# Create our bronze table to append CDC data into
dlt.create_streaming_table("ee_dlt_union.bronze.appended_table_cdc")

# Create our silver table to insert cleaned CDC data into
dlt.create_streaming_table("ee_dlt_union.silver.target_table_scd_type_2")

# Notice the difference below of how we read in 
# the same tables from before, but this time we
# include the change data feed columns

# Append our static data with its "change data feed" 
@dlt.append_flow(target="ee_dlt_union.bronze.appended_table_cdc")
def my_static_table():
  return (spark.readStream.format("delta") \
    .option("readChangeData", "true") \
    .table("simulate_static")
  )

# Append our streaming data with its "change data feed" 
@dlt.append_flow(target="ee_dlt_union.bronze.appended_table_cdc")
def my_streaming_table():
  return (spark.readStream.format("delta") \
    .option("readChangeData", "true") \
    .table("simulate_stream")
  )

# Now we will use this function to create
# our CDC flow
dlt.create_auto_cdc_flow(
  target = "ee_dlt_union.silver.target_table_scd_type_2",  # Set our target table
  source = "ee_dlt_union.bronze.appended_table_cdc",  # We will look for data here
  keys = ["transaction_id"],  # What we'll use to match the rows to upsert
  sequence_by = "_commit_version",
  ignore_null_updates = False, 
  apply_as_deletes = expr("_change_type = 'delete'"),
  stored_as_scd_type = 2 # THIS TELLS THE FUNCTION WE WANT SCD TYPE 2
)

This will add two additional columns to the silver.target_table_scd_type_2 table: _START_AT and _END_AT. You can learn more about Type 2 SCDs here.

Conclusion

You should have everything you need to make a pipeline that appends data from different sources, whether they are static or streaming, and implement CDC if needed. If you have any questions, don’t hesitate to reach out.