Table Deletes, Updates, and Merges
Delta Lake supports several statements to facilitate updating and deleting data from Delta Lake tables. You can use programmatic APIs and Spark DataFrames and Datasets to perform these updates.
Delete from a table
You can remove data that matches a predicate from a Delta Lake table. For instance, to delete all events from before 2017
, you can run the following:
- Scala
import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, pathToEventsTable) deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string import org.apache.spark.sql.functions._ import spark.implicits._ deltaTable.delete($"date" < "2017-01-01") // predicate using Spark SQL functions and implicits
- Java
import io.delta.tables.*; import org.apache.spark.sql.functions; DeltaTable deltaTable = DeltaTable.forPath(spark, pathToEventsTable); deltaTable.delete("date < '2017-01-01'"); // predicate using SQL formatted string deltaTable.delete(functions.col("date").lt(functions.lit("2017-01-01"))); // predicate using Spark SQL functions
See Programmatic API Docs for more details.
Important
delete
removes the data from the latest version of the Delta Lake table but does not remove it from the physical storage until the old versions are explicitly vacuumed. See vacuum for more details.
Tip
When possible, provide predicates on the partition columns for a partitioned Delta Lake table as such predicates can significantly speed up the operation.
Update a table
You can update data that matches a predicate in a Delta Lake table. For example, to fix a spelling mistake in the eventType
, you can run the following:
- Scala
import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, pathToEventsTable) deltaTable.updateExpr( // predicate and update expressions using SQL formatted string "eventType = 'clck'", Map("eventType" -> "'click'") import org.apache.spark.sql.functions._ import spark.implicits._ deltaTable.update( // predicate using Spark SQL functions and implicits $"eventType" = "clck"), Map("eventType" -> lit("click"));
- Java
import io.delta.tables.*; import org.apache.spark.sql.functions; import java.util.HashMap; DeltaTable deltaTable = DeltaTable.forPath(spark, pathToEventsTable); deltaTable.updateExpr( // predicate and update expressions using SQL formatted string "eventType = 'clck'", new HashMap<String, String>() {{ put("eventType", "'click'"); }} ); deltaTable.update( // predicate using Spark SQL functions functions.col(eventType).eq("clck"), new HashMap<String, Column>() {{ put("eventType", functions.lit("click")); }} );
See Programmatic API Docs for more details.
Tip
Similar to delete, update operations can get a significant speedup with predicates on partitions.
Upsert into a table using merge
You can upsert data from a Spark DataFrame into a Delta Lake table using the merge
operation. This operation is similar to the SQL MERGE
command but has additional support for deletes and extra conditions in updates, inserts, and deletes.
Suppose you have a Spark DataFrame that contains new data for events with eventId
. Some of these events may already be present in the events
table.
So when you want to merge the new data into the events
table, you want update the matching rows (that is, eventId
already present) and insert the new rows (that is, eventId
no present). You can run the following:
- Scala
import io.delta.tables._ import org.apache.spark.sql.functions._ val updatesDF = ... // define the updates DataFrame[date, eventId, data] DeltaTable.forPath(spark, pathToEventsTable) .as("events") .merge( updatesDF.as("updates"), "events.eventId = updates.eventId") .whenMatched .updateExpr( Map("data" -> "updates.data")) .whenNotMatched .insertExpr( Map( "date" -> "updates.date", "eventId" -> "updates.eventId", "data" -> "updates.data")) .execute()
- Java
import io.delta.tables.*; import org.apache.spark.sql.functions; import java.util.HashMap; Dataset<Row> updatesDF = ... // define the updates DataFrame[date, eventId, data] DeltaTable.forPath(spark, pathToEventsTable) .as("events") .merge( updatesDF.as("updates"), "events.eventId = updates.eventId") .whenMatched() .updateExpr( new HashMap<String, String>() {{ put("data" -> "events.data"); }}) .whenNotMatched() .insertExpr( new HashMap<String, String>() {{ put("date", "updates.date"); put("eventId", "updates.eventId"); put("data", "updates.data"); }}) .execute();
Here is a detailed description of the merge
operation.
- There can be 1, 2, or 3
when
clauses. Of these, at most 2 can bewhenMatched
clauses, and at most 1 can be awhenNotMatched
clause. whenMatched
clauses:- There can be at most one
update
action and onedelete
action inwhenMatched
clauses. - Each
whenMatched
clause can have an optional condition. However, if there are twowhenMatched
clauses, then the first one must have a condition. - When there are two
whenMatched
clauses and there are conditions (or the lack of) such that a row matches both clauses, then the first clause/action is executed. In other words, the order of thewhenMatched
clauses matter. - If none of the
whenMatched
clauses match a source-target row pair that satisfy the merge condition, then the target rows will not be updated. - If you want to update all the columns of the target Delta Lake table with the corresponding column of the source DataFrame, use
whenMatched(...).updateAll()
. This is equivalent toupdateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
.
- There can be at most one
whenNotMatched
clause:- Can have only the
insert
action, which can have an optional condition. - If the
whenNotMatched
clause is not present or if it is present but the non-matching source row does not satisfy the condition, then the source row is not inserted. - If you want to insert all the columns of the target Delta Lake table with the corresponding column of the source Dataframe, use
whenMatched(...).insertAll()
. This is equivalent toinsertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
.
- Can have only the
See Programmatic API Docs for more details.
Tip
You should add as much information to the merge condition to both reduce the amount of work and reduce the chances of transaction conflicts. For example, suppose you have a table that is partitioned by country
and date
and you want to use merge
to update information for the last day country by country. Adding the condition events.date = current_date() AND events.country = 'USA'
will make the query faster as it will look for matches only in the relevant partitions.
merge
examples
Here are a few examples on how to use merge
in different scenarios.
Data deduplication when writing into Delta Lake tables
A common ETL use case is to collect logs into Delta Lake table by appending them to a table. However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. With merge
, you can avoid inserting the duplicate records.
deltaTable
.as("logs")
.merge(
updates.as("updates"),
"logs.uniqueId = updates.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Furthermore, if you know that you may get duplicate records only for a few days, you can optimized your query further by partitioning the table by date, and then specifying the date range of the target table to match on.
deltaTable
.as("logs")
.merge(
updates.as("updates"),
"logs.uniqueId = updates.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("updates.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
This will be more efficient than the previous command as it will looks for duplicates only in the last 7 days of logs, not the entire table.
Slowly changing data (SCD) Type 2 operation into Delta Lake tables
Another common operation is SCD Type 2, which maintains history of all changes made to each key in a dimensional table. Such operations require updating existing rows to mark previous values of keys as old, and the inserting the new rows as the latest values. Given a source table with updates and the target table with the dimensional data, SCD Type 2 can be expressed with merge
. Here is a concrete example of maintaining the history of addresses for a customer along with the active date range of each address. When a customer’s address needs to be updated, you have to mark the previous address as not the current one, update its active date range, and add the new address as the current one.
val customersTable: DeltaTable = ... // table with schema (customerId, address, current, effectiveDate, endDate)
val updatesDF: DataFrame = ... // DataFrame with schema (customerId, address, effectiveDate)
// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = updatesDF
.as("updates")
.join(customersTable.toDF.as("customers"), "customerid")
.where("customers.current = true AND updates.address <> customers.address")
// Stage the update by unioning two sets of rows
// 1. Rows that will be inserted in the `whenNotMatched` clause
// 2. Rows that will either UPDATE the current addresses of existing customers or INSERT the new addresses of new customers
val stagedUpdates = newAddressesToInsert
.selectExpr("NULL as mergeKey", "updates.*") // Rows for 1.
.union(
updatesDF.selectExpr("updates.customerId as mergeKey", "*") // Rows for 2.
)
// Apply SCD Type 2 operation using merge
customersTable
.as("customers")
.merge(
stagedUpdates.as("staged_updates"),
"customers.customerId = mergeKey")
.whenMatched("customers.current = true AND customers.address <> staged_updates.address")
.updateExpr(Map( // Set current to false and endDate to source's effective date.
"current" -> "false",
"endDate" -> "staged_updates.effectiveDate"))
.whenNotMatched()
.insertExpr(Map(
"customerid" -> "staged_updates.customerId",
"address" -> "staged_updates.address",
"current" -> "true",
"effectiveDate" -> "staged_updates.effectiveDate", // Set current to true along with the new address and its effective date.
"endDate" -> "null"))
.execute()
Write change data into a Delta Lake table
Similar to SCD, another common use case, often called change data capture (CDC), is to apply
all data changes generated from an external database into a Delta Lake table. In other words, a set
of updates, deletes, and inserts applied to an external table needs to be applied to a Delta Lake table.
You can do this using merge
as follows.
// DataFrame with changes having following columns
// - key: key of the change
// - time: time of change for ordering between changes (can replaced by other ordering id)
// - newValue: updated or inserted value if key was not deleted
// - deleted: true if the key was deleted, false if the key was inserted or updated
val changesDF: DataFrame = ...
// Find the latest change for each key based on the timestamp
// Note: For nested structs, max on struct is computed as
// max on first struct field, if equal fall back to second fields, and so on.
val latestChangeForEachKey = changesDF
.selectExpr("key", "struct(time, newValue, deleted) as otherCols" )
.groupBy("key")
.agg(max("otherCols").as("latest"))
.selectExpr("key", "latest.*")
deltaTable.as("t")
.merge(
latestChangeForEachKey.as("s"),
"s.key = t.key")
.whenMatched("s.deleted = true")
.delete()
.whenMatched()
.updateExpr(Map("key" -> "s.key", "value" -> "s.newValue"))
.whenNotMatched("s.deleted = false")
.insertExpr(Map("key" -> "s.key", "value" -> "s.newValue"))
.execute()
Use merge
in streaming queries with foreachBatch
You can use a combination of merge
and foreachBatch
(see foreachbatch for more information) to write complex upserts from a streaming query into a Delta Lake table. For example:
Writing streaming aggregates in Update Mode: This is much more efficient than Complete Mode.
val deltaTable = io.delta.tables.DeltaTable.forName("aggregates") // Function to upsert `microBatchOutputDF` into Delta table using merge def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) { deltaTable.as("t") .merge( microBatchOutputDF.as("s"), "s.key = t.key") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute() } // Write the output of a streaming aggregation query into Delta streamingAggregatesDF.writeStream .format("delta") .foreachBatch(upsertToDelta _) .outputMode("update") .start()
Writing a stream of database changes into a Delta Lake table: The earlier
merge
query for writing change data can be used inforeachBatch
to continuously apply a stream of changes to a Delta Lake table.
Make sure that your merge
statement inside foreachBatch
is idempotent as reprocessing of the streaming query can apply the operation on the same batch of data multiple times.