Table deletes, updates, and merges
Delta Lake supports several statements to facilitate deleting data from and updating data in Delta tables.
Delete from a table
Section titled “Delete from a table”You can remove data that matches a predicate from a Delta table. For instance, in a table named people10m or a path at /tmp/delta/people-10m, to delete all rows corresponding to people with a value in the birthDate column from before 1955, you can run the following:
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'See Configure SparkSession for the steps to enable support for SQL commands.
from delta.tables import *from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.deltaTable.delete(col('birthDate') < '1960-01-01')import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.deltaTable.delete(col("birthDate") < "1955-01-01")import io.delta.tables.*;import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m");
// Declare the predicate by using a SQL-formatted string.deltaTable.delete("birthDate < '1955-01-01'");
// Declare the predicate by using Spark SQL functions.deltaTable.delete(functions.col("birthDate").lt(functions.lit("1955-01-01")));See the Delta Lake APIs for details.
Update a table
Section titled “Update a table”You can update data that matches a predicate in a Delta table. For example, in a table named people10m or a path at /tmp/delta/people-10m, to change an abbreviation in the gender column from M or F to Male or Female, you can run the following:
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';See Configure SparkSession for the steps to enable support for SQL commands.
from delta.tables import *from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.deltaTable.update( condition = "gender = 'F'", set = { "gender": "'Female'" })
# Declare the predicate by using Spark SQL functions.deltaTable.update( condition = col('gender') == 'M', set = { 'gender': lit('Male') })import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.deltaTable.updateExpr( "gender = 'F'", Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.deltaTable.update( col("gender") === "M", Map("gender" -> lit("Male")));import io.delta.tables.*;import org.apache.spark.sql.functions;import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");
// Declare the predicate by using a SQL-formatted string.deltaTable.updateExpr( "gender = 'F'", new HashMap<String, String>() {{ put("gender", "'Female'"); }});
// Declare the predicate by using Spark SQL functions.deltaTable.update( functions.col(gender).eq("M"), new HashMap<String, Column>() {{ put("gender", functions.lit("Male")); }});See the Delta Lake APIs for details.
Upsert into a table using merge
Section titled “Upsert into a table using merge”You can upsert data from a source table, view, or DataFrame into a target Delta table by using the MERGE SQL operation. Delta Lake supports inserts, updates and deletes in MERGE, and it supports extended syntax beyond the SQL standards to facilitate advanced use cases.
Suppose you have a source table named people10mupdates or a source path at /tmp/delta/people-10m-updates that contains new data for a target table named people10m or a target path at /tmp/delta/people-10m. Some of these new records may already be present in the target data. To merge the new data, you want to update rows where the person’s id is already present and insert the new rows where no matching id is present. You can run the following:
MERGE INTO people10mUSING people10mupdatesON people10m.id = people10mupdates.idWHEN MATCHED THEN UPDATE SET id = people10mupdates.id, firstName = people10mupdates.firstName, middleName = people10mupdates.middleName, lastName = people10mupdates.lastName, gender = people10mupdates.gender, birthDate = people10mupdates.birthDate, ssn = people10mupdates.ssn, salary = people10mupdates.salaryWHEN NOT MATCHED THEN INSERT ( id, firstName, middleName, lastName, gender, birthDate, ssn, salary ) VALUES ( people10mupdates.id, people10mupdates.firstName, people10mupdates.middleName, people10mupdates.lastName, people10mupdates.gender, people10mupdates.birthDate, people10mupdates.ssn, people10mupdates.salary )See Configure SparkSession for the steps to enable support for SQL commands.
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \ .merge( dfUpdates.alias('updates'), 'people.id = updates.id' ) \ .whenMatchedUpdate(set = { "id": "updates.id", "firstName": "updates.firstName", "middleName": "updates.middleName", "lastName": "updates.lastName", "gender": "updates.gender", "birthDate": "updates.birthDate", "ssn": "updates.ssn", "salary": "updates.salary" } ) \ .whenNotMatchedInsert(values = { "id": "updates.id", "firstName": "updates.firstName", "middleName": "updates.middleName", "lastName": "updates.lastName", "gender": "updates.gender", "birthDate": "updates.birthDate", "ssn": "updates.ssn", "salary": "updates.salary" } ) \ .execute()import io.delta.tables._import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople .as("people") .merge( dfUpdates.as("updates"), "people.id = updates.id") .whenMatched .updateExpr( Map( "id" -> "updates.id", "firstName" -> "updates.firstName", "middleName" -> "updates.middleName", "lastName" -> "updates.lastName", "gender" -> "updates.gender", "birthDate" -> "updates.birthDate", "ssn" -> "updates.ssn", "salary" -> "updates.salary" )) .whenNotMatched .insertExpr( Map( "id" -> "updates.id", "firstName" -> "updates.firstName", "middleName" -> "updates.middleName", "lastName" -> "updates.lastName", "gender" -> "updates.gender", "birthDate" -> "updates.birthDate", "ssn" -> "updates.ssn", "salary" -> "updates.salary" )) .execute()import io.delta.tables.*;import org.apache.spark.sql.functions;import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")Dataset<Row> dfUpdates = spark.read("delta").load("/tmp/delta/people-10m-updates")
deltaTable .as("people") .merge( dfUpdates.as("updates"), "people.id = updates.id") .whenMatched() .updateExpr( new HashMap<String, String>() {{ put("id", "updates.id"); put("firstName", "updates.firstName"); put("middleName", "updates.middleName"); put("lastName", "updates.lastName"); put("gender", "updates.gender"); put("birthDate", "updates.birthDate"); put("ssn", "updates.ssn"); put("salary", "updates.salary"); }}) .whenNotMatched() .insertExpr( new HashMap<String, String>() {{ put("id", "updates.id"); put("firstName", "updates.firstName"); put("middleName", "updates.middleName"); put("lastName", "updates.lastName"); put("gender", "updates.gender"); put("birthDate", "updates.birthDate"); put("ssn", "updates.ssn"); put("salary", "updates.salary"); }}) .execute();See the Delta Lake APIs for Scala, Java, and Python syntax details.
Modify all unmatched rows using merge
Section titled “Modify all unmatched rows using merge”You can use the WHEN NOT MATCHED BY SOURCE clause to UPDATE or DELETE records in the target table that do not have corresponding records in the source table. We recommend adding an optional conditional clause to avoid fully rewriting the target table.
The following code example shows the basic syntax of using this for deletes, overwriting the target table with the contents of the source table and deleting unmatched records in the target table.
MERGE INTO targetUSING sourceON source.key = target.keyWHEN MATCHED UPDATE SET *WHEN NOT MATCHED INSERT *WHEN NOT MATCHED BY SOURCE DELETE(targetDF .merge(sourceDF, "source.key = target.key") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .whenNotMatchedBySourceDelete() .execute())targetDF .merge(sourceDF, "source.key = target.key") .whenMatched() .updateAll() .whenNotMatched() .insertAll() .whenNotMatchedBySource() .delete() .execute()The following example adds conditions to the WHEN NOT MATCHED BY SOURCE clause and specifies values to update in unmatched target rows.
MERGE INTO targetUSING sourceON source.key = target.keyWHEN MATCHED THEN UPDATE SET target.lastSeen = source.timestampWHEN NOT MATCHED THEN INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN UPDATE SET target.status = 'inactive'(targetDF .merge(sourceDF, "source.key = target.key") .whenMatchedUpdate( set = {"target.lastSeen": "source.timestamp"} ) .whenNotMatchedInsert( values = { "target.key": "source.key", "target.lastSeen": "source.timestamp", "target.status": "'active'" } ) .whenNotMatchedBySourceUpdate( condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)", set = {"target.status": "'inactive'"} ) .execute())targetDF .merge(sourceDF, "source.key = target.key") .whenMatched() .updateExpr(Map("target.lastSeen" -> "source.timestamp")) .whenNotMatched() .insertExpr(Map( "target.key" -> "source.key", "target.lastSeen" -> "source.timestamp", "target.status" -> "'active'", ) ) .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)") .updateExpr(Map("target.status" -> "'inactive'")) .execute()Operation semantics
Section titled “Operation semantics”Here is a detailed description of the merge programmatic operation.
-
There can be any number of
whenMatchedandwhenNotMatchedclauses. -
whenMatchedclauses are executed when a source row matches a target table row based on the match condition. These clauses have the following semantics.-
whenMatchedclauses can have at most oneupdateand onedeleteaction. Theupdateaction inmergeonly updates the specified columns (similar to theupdateoperation) of the matched target row. Thedeleteaction deletes the matched row. -
Each
whenMatchedclause can have an optional condition. If this clause condition exists, theupdateordeleteaction is executed for any matching source-target row pair only when the clause condition is true. -
If there are multiple
whenMatchedclauses, then they are evaluated in the order they are specified. AllwhenMatchedclauses, except the last one, must have conditions. -
If none of the
whenMatchedconditions evaluate to true for a source and target row pair that matches the merge condition, then the target row is left unchanged. -
To update all the columns of the target Delta table with the corresponding columns of the source dataset, use
whenMatched(...).updateAll(). This is equivalent to:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query throws an analysis error.
-
-
whenNotMatchedclauses are executed when a source row does not match any target row based on the match condition. These clauses have the following semantics.-
whenNotMatchedclauses can have only theinsertaction. The new row is generated based on the specified column and corresponding expressions. You do not need to specify all the columns in the target table. For unspecified target columns,NULLis inserted. -
Each
whenNotMatchedclause can have an optional condition. If the clause condition is present, a source row is inserted only if that condition is true for that row. Otherwise, the source column is ignored. -
If there are multiple
whenNotMatchedclauses, then they are evaluated in the order they are specified. AllwhenNotMatchedclauses, except the last one, must have conditions. -
To insert all the columns of the target Delta table with the corresponding columns of the source dataset, use
whenNotMatched(...).insertAll(). This is equivalent to:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query throws an analysis error.
-
-
whenNotMatchedBySourceclauses are executed when a target row does not match any source row based on the merge condition. These clauses have the following semantics.whenNotMatchedBySourceclauses can specifydeleteandupdateactions.- Each
whenNotMatchedBySourceclause can have an optional condition. If the clause condition is present, a target row is modified only if that condition is true for that row. Otherwise, the target row is left unchanged. - If there are multiple
whenNotMatchedBySourceclauses, then they are evaluated in the order they are specified. AllwhenNotMatchedBySourceclauses, except the last one, must have conditions. - By definition,
whenNotMatchedBySourceclauses do not have a source row to pull column values from, and so source columns can’t be referenced. For each column to be modified, you can either specify a literal or perform an action on the target column, such asSET target.deleted_count = target.deleted_count + 1.
Schema validation
Section titled “Schema validation”merge automatically validates that the schema of the data generated by insert and update expressions are compatible with the schema of the table. It uses the following rules to determine whether the merge operation is compatible:
- For
updateandinsertactions, the specified target columns must exist in the target Delta table. - For
updateAllandinsertAllactions, the source dataset must have all the columns of the target Delta table. The source dataset can have extra columns and they are ignored.
If you do not want the extra columns to be ignored and instead want to update the target table schema to include new columns, see Automatic schema evolution.
- For all actions, if the data type generated by the expressions producing the target columns are different from the corresponding columns in the target Delta table,
mergetries to cast them to the types in the table.
Automatic schema evolution
Section titled “Automatic schema evolution”Schema evolution allows users to resolve schema mismatches between the target and source table in merge. It handles the following two cases:
- A column in the source table is not present in the target table. The new column is added to the target schema, and its values are inserted or updated using the source values.
- A column in the target table is not present in the source table. The target schema is left unchanged; the values in the additional target column are either left unchanged (for
UPDATE) or set toNULL(forINSERT).
Here are a few examples of the effects of merge operation with and without schema evolution.
| Columns | Query (in SQL) | Behavior without schema evolution (default) | Behavior with schema evolution |
|---|---|---|---|
Target: key, value Source: key, value, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * | The table schema remains unchanged; only columns key, value are updated/inserted. | The table schema is changed to (key, value, new_value). Existing records with matches are updated with the value and new_value in the source. New rows are inserted with the schema (key, value, new_value). |
Target: key, old_value Source: key, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * | UPDATE and INSERT actions throw an error because the target column old_value is not in the source. | The table schema is changed to (key, old_value, new_value). Existing records with matches are updated with the new_value in the source leaving old_value unchanged. New records are inserted with the specified key, new_value, and NULL for the old_value. |
Target: key, old_value Source: key, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value | UPDATE throws an error because column new_value does not exist in the target table. | The table schema is changed to (key, old_value, new_value). Existing records with matches are updated with the new_value in the source leaving old_value unchanged, and unmatched records have NULL entered for new_value. |
Target: key, old_value Source: key, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) | INSERTthrows an error because columnnew_valuedoes not exist in the target table. | The table schema is changed to(key, old_value, new_value). New records are inserted with the specified key, new_value, and NULLfor theold_value. Existing records have NULLentered fornew_valueleavingold_value unchanged. See note (1). |
Special considerations for schemas that contain arrays of structs
Section titled “Special considerations for schemas that contain arrays of structs”Delta MERGE INTO supports resolving struct fields by name and evolving schemas for arrays of structs. With schema evolution enabled, target table schemas will evolve for arrays of structs, which also works with any nested structs inside of arrays.
Here are a few examples of the effects of merge operations with and without schema evolution for arrays of structs.
| Source schema | Target schema | Behavior without schema evolution (default) | Behavior with schema evolution |
|---|---|---|---|
| array<struct<b: string, a: string>> | array<struct<a: int, b: int>> | The table schema remains unchanged. Columns will be resolved by name and updated or inserted. | The table schema remains unchanged. Columns will be resolved by name and updated or inserted. |
| array<struct<a: int, c: string, d: string>> | array<struct<a: string, b: string>> | update and insert throw errors because c and d do not exist in the target table. | The table schema is changed to array<struct<a: string, b: string, c: string, d: string>>. c and d are inserted as NULL for existing entries in the target table. update and insert fill entries in the source table with a casted to string and b as NULL. |
| array<struct<a: string, b: struct<c: string, d: string>>> | array<struct<a: string, b: struct<c: string>>> | update and insert throw errors because d does not exist in the target table. | The target table schema is changed to array<struct<a: string, b: struct<c: string, d: string>>>. d is inserted as NULL for existing entries in the target table. |
Performance tuning
Section titled “Performance tuning”You can reduce the time taken by merge using the following approaches:
-
Reduce the search space for matches: By default, the
mergeoperation searches the entire Delta table to find matches in the source table. One way to speed upmergeis to reduce the search space by adding known constraints in the match condition. For example, suppose you have a table that is partitioned bycountryanddateand you want to usemergeto update information for the last day and a specific country. Adding the conditionevents.date = current_date() AND events.country = 'USA'will make the query faster as it looks for matches only in the relevant partitions. Furthermore, it will also reduce the chances of conflicts with other concurrent operations. See Concurrency control for more details.
-
Compact files: If the data is stored in many small files, reading the data to search for matches can become slow. You can compact small files into larger files to improve read throughput. See Compact files for details.
-
Control the shuffle partitions for writes: The
mergeoperation shuffles data multiple times to compute and write the updated data. The number of tasks used to shuffle is controlled by the Spark session configurationspark.sql.shuffle.partitions. Setting this parameter not only controls the parallelism but also determines the number of output files. Increasing the value increases parallelism but also generates a larger number of smaller data files. -
Repartition output data before write: For partitioned tables,
mergecan produce a much larger number of small files than the number of shuffle partitions. This is because every shuffle task can write multiple files in multiple partitions, and can become a performance bottleneck. In many cases, it helps to repartition the output data by the table’s partition columns before writing it. You enable this by setting the Spark session configurationspark.databricks.delta.merge.repartitionBeforeWrite.enabledtotrue.
Merge examples
Section titled “Merge examples”Here are a few examples on how to use merge in different scenarios.
Data deduplication when writing into Delta tables
Section titled “Data deduplication when writing into Delta tables”A common ETL use case is to collect logs into Delta table by appending them to a table. However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. With merge, you can avoid inserting the duplicate records.
MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYSWHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS THEN INSERT *deltaTable.alias("logs").merge( newDedupedLogs.alias("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId") \.whenNotMatchedInsertAll() \.execute()deltaTable .as("logs") .merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId") .whenNotMatched() .insertAll() .execute()deltaTable .as("logs") .merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId") .whenNotMatched() .insertAll() .execute();If you know that you may get duplicate records only for a few days, you can optimized your query further by partitioning the table by date, and then specifying the date range of the target table to match on.
MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYSWHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS THEN INSERT *deltaTable.alias("logs").merge( newDedupedLogs.alias("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \.execute()deltaTable.as("logs").merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS").whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS").insertAll().execute()deltaTable.as("logs").merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS").whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS").insertAll().execute();This is more efficient than the previous command as it looks for duplicates only in the last 7 days of logs, not the entire table. Furthermore, you can use this insert-only merge with Structured Streaming to perform continuous deduplication of the logs.
- In a streaming query, you can use merge operation in
foreachBatchto continuously write any streaming data to a Delta table with deduplication. See the following streaming example for more information onforeachBatch. - In another streaming query, you can continuously read deduplicated data from this Delta table. This is possible because an insert-only merge only appends new data to the Delta table.
Slowly changing data (SCD) Type 2 operation into Delta tables
Section titled “Slowly changing data (SCD) Type 2 operation into Delta tables”Another common operation is SCD Type 2, which maintains history of all changes made to each key in a dimensional table. Such operations require updating existing rows to mark previous values of keys as old, and the inserting the new rows as the latest values. Given a source table with updates and the target table with the dimensional data, SCD Type 2 can be expressed with merge.
Here is a concrete example of maintaining the history of addresses for a customer along with the active date range of each address. When a customer’s address needs to be updated, you have to mark the previous address as not the current one, update its active date range, and add the new address as the current one.
customersTable = ... # DeltaTable with schema (customerId, address, current, effectiveDate, endDate)
updatesDF = ... # DataFrame with schema (customerId, address, effectiveDate)
# Rows to INSERT new addresses of existing customersnewAddressesToInsert = updatesDF \ .alias("updates") \ .join(customersTable.toDF().alias("customers"), "customerid") \ .where("customers.current = true AND updates.address <> customers.address")
# Stage the update by unioning two sets of rows# 1. Rows that will be inserted in the whenNotMatched clause# 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customersstagedUpdates = ( newAddressesToInsert .selectExpr("NULL as mergeKey", "updates.*") # Rows for 1 .union(updatesDF.selectExpr("updates.customerId as mergeKey", "*")) # Rows for 2.)
# Apply SCD Type 2 operation using mergecustomersTable.alias("customers").merge( stagedUpdates.alias("staged_updates"), "customers.customerId = mergeKey") \.whenMatchedUpdate( condition = "customers.current = true AND customers.address <> staged_updates.address", set = { # Set current to false and endDate to source's effective date. "current": "false", "endDate": "staged_updates.effectiveDate" }).whenNotMatchedInsert( values = { "customerid": "staged_updates.customerId", "address": "staged_updates.address", "current": "true", "effectiveDate": "staged_updates.effectiveDate", # Set current to true along with the new address and its effective date. "endDate": "null" }).execute()val customersTable: DeltaTable = ... // table with schema (customerId, address, current, effectiveDate, endDate)
val updatesDF: DataFrame = ... // DataFrame with schema (customerId, address, effectiveDate)
// Rows to INSERT new addresses of existing customersval newAddressesToInsert = updatesDF .as("updates") .join(customersTable.toDF.as("customers"), "customerid") .where("customers.current = true AND updates.address <> customers.address")
// Stage the update by unioning two sets of rows// 1. Rows that will be inserted in the whenNotMatched clause// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customersval stagedUpdates = newAddressesToInsert .selectExpr("NULL as mergeKey", "updates.*") // Rows for 1. .union( updatesDF.selectExpr("updates.customerId as mergeKey", "*") // Rows for 2. )
// Apply SCD Type 2 operation using mergecustomersTable .as("customers") .merge( stagedUpdates.as("staged_updates"), "customers.customerId = mergeKey") .whenMatched("customers.current = true AND customers.address <> staged_updates.address") .updateExpr(Map( // Set current to false and endDate to source's effective date. "current" -> "false", "endDate" -> "staged_updates.effectiveDate")) .whenNotMatched() .insertExpr(Map( "customerid" -> "staged_updates.customerId", "address" -> "staged_updates.address", "current" -> "true", "effectiveDate" -> "staged_updates.effectiveDate", // Set current to true along with the new address and its effective date. "endDate" -> "null")) .execute()Write change data into a Delta table
Section titled “Write change data into a Delta table”Similar to SCD, another common use case, often called change data capture (CDC), is to apply all data changes generated from an external database into a Delta table. In other words, a set of updates, deletes, and inserts applied to an external table needs to be applied to a Delta table. You can do this using merge as follows.
deltaTable = ... # DeltaTable with schema (key, value)
# DataFrame with changes having following columns# - key: key of the change# - time: time of change for ordering between changes (can replaced by other ordering id)# - newValue: updated or inserted value if key was not deleted# - deleted: true if the key was deleted, false if the key was inserted or updatedchangesDF = spark.table("changes")
# Find the latest change for each key based on the timestamp# Note: For nested structs, max on struct is computed as# max on first struct field, if equal fall back to second fields, and so on.latestChangeForEachKey = changesDF \ .selectExpr("key", "struct(time, newValue, deleted) as otherCols") \ .groupBy("key") \ .agg(max("otherCols").alias("latest")) \ .select("key", "latest.*") \
deltaTable.alias("t").merge( latestChangeForEachKey.alias("s"), "s.key = t.key") \ .whenMatchedDelete(condition = "s.deleted = true") \ .whenMatchedUpdate(set = { "key": "s.key", "value": "s.newValue" }) \ .whenNotMatchedInsert( condition = "s.deleted = false", values = { "key": "s.key", "value": "s.newValue" } ).execute()val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)
// DataFrame with changes having following columns// - key: key of the change// - time: time of change for ordering between changes (can replaced by other ordering id)// - newValue: updated or inserted value if key was not deleted// - deleted: true if the key was deleted, false if the key was inserted or updatedval changesDF: DataFrame = ...
// Find the latest change for each key based on the timestamp// Note: For nested structs, max on struct is computed as// max on first struct field, if equal fall back to second fields, and so on.val latestChangeForEachKey = changesDF .selectExpr("key", "struct(time, newValue, deleted) as otherCols" ) .groupBy("key") .agg(max("otherCols").as("latest")) .selectExpr("key", "latest.*")
deltaTable.as("t") .merge( latestChangeForEachKey.as("s"), "s.key = t.key") .whenMatched("s.deleted = true") .delete() .whenMatched() .updateExpr(Map("key" -> "s.key", "value" -> "s.newValue")) .whenNotMatched("s.deleted = false") .insertExpr(Map("key" -> "s.key", "value" -> "s.newValue")) .execute()Upsert from streaming queries using foreachBatch
Section titled “Upsert from streaming queries using foreachBatch”You can use a combination of merge and foreachBatch (see foreachbatch for more information) to write complex upserts from a streaming query into a Delta table. For example:
-
Write streaming aggregates in Update Mode: This is much more efficient than Complete Mode.
from delta.tables import *deltaTable = DeltaTable.forPath(spark, "/data/aggregates")# Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta(microBatchOutputDF, batchId):deltaTable.alias("t").merge(microBatchOutputDF.alias("s"),"s.key = t.key") \.whenMatchedUpdateAll() \.whenNotMatchedInsertAll() \.execute()}# Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream \.format("delta") \.foreachBatch(upsertToDelta) \.outputMode("update") \.start()import io.delta.tables.*val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")// Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {deltaTable.as("t").merge(microBatchOutputDF.as("s"),"s.key = t.key").whenMatched().updateAll().whenNotMatched().insertAll().execute()}// Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("update").start() -
Write a stream of database changes into a Delta table: The merge query for writing change data can be used in
foreachBatchto continuously apply a stream of changes to a Delta table. -
Write a stream data into Delta table with deduplication: The insert-only merge query for deduplication can be used in
foreachBatchto continuously write data (with duplicates) to a Delta table with automatic deduplication.