Table deletes, updates, and merges

Delta Lake supports several statements to facilitate deleting data from and updating data in Delta tables.

In this article:

Configure SparkSession

For many Delta Lake operations, you must enable the integration with Apache Spark DataSourceV2 and Catalog APIs (since 3.0) by setting the following configurations when creating a new SparkSession.

 from pyspark.sql import SparkSession

 spark = SparkSession \
   .builder \
   .appName("...") \
   .master("...") \
   .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
   .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
   .getOrCreate()
 import org.apache.spark.sql.SparkSession

 val spark = SparkSession
   .builder()
   .appName("...")
   .master("...")
   .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
   .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
   .getOrCreate()
 import org.apache.spark.sql.SparkSession;

 SparkSession spark = SparkSession
   .builder()
   .appName("...")
   .master("...")
   .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
   .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
   .getOrCreate();

Alternatively, you can add additional configurations when submitting your Spark application using spark-submit or when starting spark-shell/pyspark by specifying them as command-line parameters.

spark-submit --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"  ...
pyspark --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Delete from a table

You can remove data that matches a predicate from a Delta table. For instance, to delete all events from before 2017, you can run the following:

DELETE FROM events WHERE date < '2017-01-01'

DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

See Configure SparkSession for the steps to enable support for SQL commands.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        # predicate using SQL formatted string

deltaTable.delete(col("date") < "2017-01-01")   # predicate using Spark SQL functions
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.delete(col("date") < "2017-01-01")       // predicate using Spark SQL functions and implicits
import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.delete("date < '2017-01-01'");            // predicate using SQL formatted string

deltaTable.delete(functions.col("date").lt(functions.lit("2017-01-01")));   // predicate using Spark SQL functions

See the Delta Lake API reference for details.

Important

delete removes the data from the latest version of the Delta table but does not remove it from the physical storage until the old versions are explicitly vacuumed. See vacuum for details.

Tip

When possible, provide predicates on the partition columns for a partitioned Delta table as such predicates can significantly speed up the operation.

Update a table

You can update data that matches a predicate in a Delta table. For example, to fix a spelling mistake in the eventType, you can run the following:

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'

UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

See Configure SparkSession for the steps to enable support for SQL commands.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } )   # predicate using SQL formatted string

deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } )   # predicate using Spark SQL functions
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  Map("eventType" -> "'click'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                // predicate using Spark SQL functions and implicits
  col("eventType") === "clck",
  Map("eventType" -> lit("click")));
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  new HashMap<String, String>() {{
    put("eventType", "'click'");
  }}
);

deltaTable.update(                // predicate using Spark SQL functions
  functions.col(eventType).eq("clck"),
  new HashMap<String, Column>() {{
    put("eventType", functions.lit("click"));
  }}
);

See the Delta Lake API reference for details.

Tip

Similar to delete, update operations can get a significant speedup with predicates on partitions.

Upsert into a table using merge

You can upsert data from a source table, view, or DataFrame into a target Delta table using the merge operation. This operation is similar to the SQL MERGE INTO command but has additional support for deletes and extra conditions in updates, inserts, and deletes.

Suppose you have a Spark DataFrame that contains new data for events with eventId. Some of these events may already be present in the events table. To merge the new data into the events table, you want to update the matching rows (that is, eventId already present) and insert the new rows (that is, eventId not present). You can run the following:

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
  • the Delta Lake library 7.x and above: _
  • the Delta Lake library 5.5 LTS and 6.x: _

See Configure SparkSession for the steps to enable support for SQL commands.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.alias("events").merge(
    updatesDF.alias("updates"),
    "events.eventId = updates.eventId") \
  .whenMatchedUpdate(set = { "data" : "updates.data" } ) \
  .whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data"
    }
  ) \
  .execute()
import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

Dataset<Row> updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched()
  .updateExpr(
    new HashMap<String, String>() {{
      put("data", "events.data");
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, String>() {{
      put("date", "updates.date");
      put("eventId", "updates.eventId");
      put("data", "updates.data");
    }})
  .execute();

See the Delta Lake API reference for Scala, Java, and Python syntax details.

Operation semantics

Here is a detailed description of the merge programmatic operation.

  • There can be any number of whenMatched and whenNotMatched clauses.

    Note

    As of Apache Spark 3.0, the MERGE SQL command can have at most 2 WHEN MATCHED clauses and at most 1 can be WHEN NOT MATCHED clause. You can have any number of clauses using Scala, Java and Python APIs.

  • whenMatched clauses are executed when a source row matches a target table row based on the match condition. These clauses have the following semantics.

    • whenMatched clauses can have at most one update and one delete action. The update action in merge only updates the specified columns (similar to the update operation) of the matched target row. The delete action deletes the matched row.

    • Each whenMatched clause can have an optional condition. If this clause condition exists, the update or delete action is executed for any matching source-target row pair only when the clause condition is true.

    • If there are multiple whenMatched clauses, then they are evaluated in order they are specified (that is, the order of the clauses matter). All whenMatched clauses, except the last one, must have conditions.

    • If multiple whenMatched clauses have conditions and none of the conditions are true for a matching source-target row pair, then the matched target row is left unchanged.

    • To update all the columns of the target Delta table with the corresponding columns of the source dataset, use whenMatched(...).updateAll(). This is equivalent to:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query throws an analysis error.

      Note

      This behavior changes when automatic schema migration is enabled. See Automatic schema evolution for details.

  • whenNotMatched clauses are executed when a source row does not match any target row based on the match condition. These clauses have the following semantics.

    • whenNotMatched clauses can have only the insert action. The new row is generated based on the specified column and corresponding expressions. You do not need to specify all the columns in the target table. For unspecified target columns, NULL is inserted.

    • Each whenNotMatched clause can have an optional condition. If the clause condition is present, a source row is inserted only if that condition is true for that row. Otherwise, the source column is ignored.

    • If there are multiple whenNotMatched clauses, then they are evaluated in order they are specified (that is, the order of the clauses matter). All whenNotMatched clauses, except the last one, must have conditions.

    • To insert all the columns of the target Delta table with the corresponding columns of the source dataset, use whenNotMatched(...).insertAll(). This is equivalent to:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query throws an analysis error.

      Note

      This behavior changes when automatic schema migration is enabled. See Automatic schema evolution for details.

Important

A merge operation can fail if multiple rows of the source dataset match and attempt to update the same rows of the target Delta table. According to the SQL semantics of merge, such an update operation is ambiguous as it is unclear which source row should be used to update the matched target row. You can preprocess the source table to eliminate the possibility of multiple matches. See the Change data capture example—it preprocesses the change dataset (that is, the source dataset) to retain only the latest change for each key before applying that change into the target Delta table.

Schema validation

merge automatically validates that the schema of the data generated by insert and update expressions are compatible with the schema of the table. It uses the following rules to determine whether the merge operation is compatible:

  • For update and insert actions, the specified target columns must exist in the target Delta table.

  • For updateAll and insertAll actions, the source dataset must have all the columns of the target Delta table. The source dataset can have extra columns and they are ignored.

    If you do not want the extra columns to be ignored and instead want to update the target table schema to include new columns, see Automatic schema evolution.

  • For all actions, if the data type generated by the expressions producing the target columns are different from the corresponding columns in the target Delta table, merge tries to cast them to the types in the table.

Automatic schema evolution

By default, updateAll and insertAll assign all the columns in the target Delta table with columns of the same name from the source dataset. Any columns in the source dataset that don’t match columns in the target table are ignored. However, in some use cases, it is desirable to automatically add source columns to the target Delta table. To automatically update the table schema during a merge operation with updateAll and insertAll (at least one of them), you can set the Spark session configuration spark.databricks.delta.schema.autoMerge.enabled to true before running the merge operation.

Note

  • Schema evolution occurs only when there is either an updateAll or an insertAll action, or both.
  • update and insert actions cannot explicitly refer to target columns that do not already exist in the target table (even it there are updateAll or insertAll as one of the clauses). See the examples below.

Here are a few examples of the effects of merge operation with and without schema evolution.

Columns Query (in Scala) Behavior without schema evolution (default) Behavior with schema evolution

Target columns: key, value

Source columns: key, value, newValue

targetDeltaTable.alias("t")
  .merge(
    sourceDataFrame.alias("s"),
    "t.key = s.key")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()
The table schema remains unchanged; only columns key, value are updated/inserted. The table schema is changed to (key, value, newValue). updateAll updates columns value and newValue, and insertAll inserts rows (key, value, newValue).

Target columns: key, oldValue

Source columns: key, newValue

targetDeltaTable.alias("t")
  .merge(
    sourceDataFrame.alias("s"),
    "t.key = s.key")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()
updateAll and insertAll actions throw an error because the target column oldValue is not in the source. The table schema is changed to (key, oldValue, newValue). updateAll updates columns key and newValue leaving oldValue unchanged, and insertAll inserts rows (key, NULL, newValue) (that is, oldValue is inserted as NULL).

Target columns: key, oldValue

Source columns: key, newValue

targetDeltaTable.alias("t")
  .merge(
    sourceDataFrame.alias("s"),
    "t.key = s.key")
  .whenMatched().update(Map(
    "newValue" -> col("s.newValue")))
  .whenNotMatched().insertAll()
  .execute()
update throws an error because column newValue does not exist in the target table. update still throws an error because column newValue does not exist in the target table.

Target columns: key, oldValue

Source columns: key, newValue

targetDeltaTable.alias("t")
  .merge(
    sourceDataFrame.alias("s"),
    "t.key = s.key")
  .whenMatched().updateAll()
  .whenNotMatched().insert(Map(
    "key" -> col("s.key"),
    "newValue" -> col("s.newValue")))
  .execute()
insert throws an error because column newValue does not exist in the target table. insert still throws an error as column newValue does not exist in the target table.

Performance tuning

You can reduce the time taken by merge using the following approaches:

  • Reduce the search space for matches: By default, the merge operation searches the entire Delta table to find matches in the source table. One way to speed up merge is to reduce the search space by adding known constraints in the match condition. For example, suppose you have a table that is partitioned by country and date and you want to use merge to update information for the last day and a specific country. Adding the condition

    events.date = current_date() AND events.country = 'USA'
    

    will make the query faster as it looks for matches only in the relevant partitions. Furthermore, it will also reduce the chances of conflicts with other concurrent operations. See Concurrency control for more details.

  • Compact files: If the data is stored in many small files, reading the data to search for matches can become slow. You can compact small files into larger files to improve read throughput. See Compact files for details.

  • Control the shuffle partitions for writes: The merge operation shuffles data multiple times to compute and write the updated data. The number of tasks used to shuffle is controlled by the Spark session configuration spark.sql.shuffle.partitions. Setting this parameter not only controls the parallelism but also determines the number of output files. Increasing the value increases parallelism but also generates a larger number of smaller data files.

  • Repartition output data before write: For partitioned tables, merge can produce a much larger number of small files than the number of shuffle partitions. This is because every shuffle task can write multiple files in multiple partitions, and can become a performance bottleneck. In many cases, it helps to repartition the output data by the table’s partition columns before writing it. You enable this by setting the Spark session configuration spark.databricks.delta.merge.repartitionBeforeWrite.enabled to true.

Merge examples

Here are a few examples on how to use merge in different scenarios.

Data deduplication when writing into Delta tables

A common ETL use case is to collect logs into Delta table by appending them to a table. However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. With merge, you can avoid inserting the duplicate records.

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Note

The dataset containing the new logs needs to be deduplicated within itself. By the SQL semantics of merge, it matches and deduplicates the new data with the existing data in the table, but if there is duplicate data within the new dataset, it is inserted. Hence, deduplicate the new data before merging into the table.

If you know that you may get duplicate records only for a few days, you can optimized your query further by partitioning the table by date, and then specifying the date range of the target table to match on.

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

This is more efficient than the previous command as it looks for duplicates only in the last 7 days of logs, not the entire table. Furthermore, you can use this insert-only merge with Structured Streaming to perform continuous deduplication of the logs.

  • In a streaming query, you can use merge operation in foreachBatch to continuously write any streaming data to a Delta table with deduplication. See the following streaming example for more information on foreachBatch.
  • In another streaming query, you can continuously read deduplicated data from this Delta table. This is possible because an insert-only merge only appends new data to the Delta table.

Slowly changing data (SCD) Type 2 operation into Delta tables

Another common operation is SCD Type 2, which maintains history of all changes made to each key in a dimensional table. Such operations require updating existing rows to mark previous values of keys as old, and the inserting the new rows as the latest values. Given a source table with updates and the target table with the dimensional data, SCD Type 2 can be expressed with merge.

Here is a concrete example of maintaining the history of addresses for a customer along with the active date range of each address. When a customer’s address needs to be updated, you have to mark the previous address as not the current one, update its active date range, and add the new address as the current one.

val customersTable: DeltaTable = ...   // table with schema (customerId, address, current, effectiveDate, endDate)

val updatesDF: DataFrame = ...          // DataFrame with schema (customerId, address, effectiveDate)

// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = updatesDF
  .as("updates")
  .join(customersTable.toDF.as("customers"), "customerid")
  .where("customers.current = true AND updates.address <> customers.address")

// Stage the update by unioning two sets of rows
// 1. Rows that will be inserted in the whenNotMatched clause
// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customers
val stagedUpdates = newAddressesToInsert
  .selectExpr("NULL as mergeKey", "updates.*")   // Rows for 1.
  .union(
    updatesDF.selectExpr("updates.customerId as mergeKey", "*")  // Rows for 2.
  )

// Apply SCD Type 2 operation using merge
customersTable
  .as("customers")
  .merge(
    stagedUpdates.as("staged_updates"),
    "customers.customerId = mergeKey")
  .whenMatched("customers.current = true AND customers.address <> staged_updates.address")
  .updateExpr(Map(                                      // Set current to false and endDate to source's effective date.
    "current" -> "false",
    "endDate" -> "staged_updates.effectiveDate"))
  .whenNotMatched()
  .insertExpr(Map(
    "customerid" -> "staged_updates.customerId",
    "address" -> "staged_updates.address",
    "current" -> "true",
    "effectiveDate" -> "staged_updates.effectiveDate",  // Set current to true along with the new address and its effective date.
    "endDate" -> "null"))
  .execute()
customersTable = ...  # DeltaTable with schema (customerId, address, current, effectiveDate, endDate)

updatesDF = ...       # DataFrame with schema (customerId, address, effectiveDate)

# Rows to INSERT new addresses of existing customers
newAddressesToInsert = updatesDF \
  .alias("updates") \
  .join(customersTable.toDF().alias("customers"), "customerid") \
  .where("customers.current = true AND updates.address <> customers.address")

# Stage the update by unioning two sets of rows
# 1. Rows that will be inserted in the whenNotMatched clause
# 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customers
stagedUpdates = (
  newAddressesToInsert
  .selectExpr("NULL as mergeKey", "updates.*")   # Rows for 1
  .union(updatesDF.selectExpr("updates.customerId as mergeKey", "*"))  # Rows for 2.
)

# Apply SCD Type 2 operation using merge
customersTable.alias("customers").merge(
  stagedUpdates.alias("staged_updates"),
  "customers.customerId = mergeKey") \
.whenMatchedUpdate(
  condition = "customers.current = true AND customers.address <> staged_updates.address",
  set = {                                      # Set current to false and endDate to source's effective date.
    "current": "false",
    "endDate": "staged_updates.effectiveDate"
  }
).whenNotMatchedInsert(
  values = {
    "customerid": "staged_updates.customerId",
    "address": "staged_updates.address",
    "current": "true",
    "effectiveDate": "staged_updates.effectiveDate",  # Set current to true along with the new address and its effective date.
    "endDate": "null"
  }
).execute()

Write change data into a Delta table

Similar to SCD, another common use case, often called change data capture (CDC), is to apply all data changes generated from an external database into a Delta table. In other words, a set of updates, deletes, and inserts applied to an external table needs to be applied to a Delta table. You can do this using merge as follows.

val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)

// DataFrame with changes having following columns
// - key: key of the change
// - time: time of change for ordering between changes (can replaced by other ordering id)
// - newValue: updated or inserted value if key was not deleted
// - deleted: true if the key was deleted, false if the key was inserted or updated
val changesDF: DataFrame = ...

// Find the latest change for each key based on the timestamp
// Note: For nested structs, max on struct is computed as
// max on first struct field, if equal fall back to second fields, and so on.
val latestChangeForEachKey = changesDF
  .selectExpr("key", "struct(time, newValue, deleted) as otherCols" )
  .groupBy("key")
  .agg(max("otherCols").as("latest"))
  .selectExpr("key", "latest.*")

deltaTable.as("t")
  .merge(
    latestChangeForEachKey.as("s"),
    "s.key = t.key")
  .whenMatched("s.deleted = true")
  .delete()
  .whenMatched()
  .updateExpr(Map("key" -> "s.key", "value" -> "s.newValue"))
  .whenNotMatched("s.deleted = false")
  .insertExpr(Map("key" -> "s.key", "value" -> "s.newValue"))
  .execute()
deltaTable = ... # DeltaTable with schema (key, value)

# DataFrame with changes having following columns
# - key: key of the change
# - time: time of change for ordering between changes (can replaced by other ordering id)
# - newValue: updated or inserted value if key was not deleted
# - deleted: true if the key was deleted, false if the key was inserted or updated
changesDF = spark.table("changes")

# Find the latest change for each key based on the timestamp
# Note: For nested structs, max on struct is computed as
# max on first struct field, if equal fall back to second fields, and so on.
latestChangeForEachKey = changesDF \
  .selectExpr("key", "struct(time, newValue, deleted) as otherCols") \
  .groupBy("key") \
  .agg(max("otherCols").alias("latest")) \
  .select("key", "latest.*") \

deltaTable.alias("t").merge(
    latestChangeForEachKey.alias("s"),
    "s.key = t.key") \
  .whenMatchedDelete(condition = "s.deleted = true") \
  .whenMatchedUpdate(set = {
    "key": "s.key",
    "value": "s.newValue"
  }) \
  .whenNotMatchedInsert(
    condition = "s.deleted = false",
    values = {
      "key": "s.key",
      "value": "s.newValue"
    }
  ).execute()

Upsert from streaming queries using foreachBatch

You can use a combination of merge and foreachBatch (see foreachbatch for more information) to write complex upserts from a streaming query into a Delta table. For example:

  • Write streaming aggregates in Update Mode: This is much more efficient than Complete Mode.

    import io.delta.tables.*
    
    val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")
    
    // Function to upsert microBatchOutputDF into Delta table using merge
    def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
      deltaTable.as("t")
        .merge(
          microBatchOutputDF.as("s"),
          "s.key = t.key")
        .whenMatched().updateAll()
        .whenNotMatched().insertAll()
        .execute()
    }
    
    // Write the output of a streaming aggregation query into Delta table
    streamingAggregatesDF.writeStream
      .format("delta")
      .foreachBatch(upsertToDelta _)
      .outputMode("update")
      .start()
    
    from delta.tables import *
    
    deltaTable = DeltaTable.forPath(spark, "/data/aggregates")
    
    # Function to upsert microBatchOutputDF into Delta table using merge
    def upsertToDelta(microBatchOutputDF, batchId):
      deltaTable.alias("t").merge(
          microBatchOutputDF.alias("s"),
          "s.key = t.key") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
    }
    
    # Write the output of a streaming aggregation query into Delta table
    streamingAggregatesDF.writeStream \
      .format("delta") \
      .foreachBatch(upsertToDelta) \
      .outputMode("update") \
      .start()
    
  • Write a stream of database changes into a Delta table: The merge query for writing change data can be used in foreachBatch to continuously apply a stream of changes to a Delta table.

  • Write a stream data into Delta table with deduplication: The insert-only merge query for deduplication can be used in foreachBatch to continuously write data (with duplicates) to a Delta table with automatic deduplication.

Note

  • Make sure that your merge statement inside foreachBatch is idempotent as restarts of the streaming query can apply the operation on the same batch of data multiple times.
  • When merge is used in foreachBatch, the input data rate of the streaming query (reported through StreamingQueryProgress and visible in the notebook rate graph) may be reported as a multiple of the actual rate at which data is generated at the source. This is because merge reads the input data multiple times causing the input metrics to be multiplied. If this is a bottleneck, you can cache the batch DataFrame before merge and then uncache it after merge.