Welcome to Delta Lake’s Python documentation page¶
DeltaTable¶
-
class
delta.tables.
DeltaTable
(spark, jdt)¶ Main class for programmatically interacting with Delta tables. You can create DeltaTable instances using the path of the Delta table.:
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
In addition, you can convert an existing Parquet table in place into a Delta table.:
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")
New in version 0.4.
Note
Evolving
-
toDF
()¶ Get a DataFrame representation of this Delta table.
Note
Evolving
New in version 0.4.
-
alias
(aliasName)¶ Apply an alias to the Delta table.
Note
Evolving
New in version 0.4.
-
delete
(condition=None)¶ Delete data from the table that match the given
condition
.Example:
deltaTable.delete("date < '2017-01-01'") # predicate using SQL formatted string deltaTable.delete(col("date") < "2017-01-01") # predicate using Spark SQL functions
Parameters: condition (str or pyspark.sql.Column) – condition of the update Note
Evolving
New in version 0.4.
-
update
(condition=None, set=None)¶ Update data from the table on the rows that match the given
condition
, which performs the rules defined byset
.Example:
# condition using SQL formatted string deltaTable.update( condition = "eventType = 'clck'", set = { "eventType": "'click'" } ) # condition using Spark SQL functions deltaTable.update( condition = col("eventType") == "clck", set = { "eventType": lit("click") } )
Parameters: - condition (str or pyspark.sql.Column) – Optional condition of the update
- set (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.
Note
Evolving
New in version 0.4.
-
merge
(source, condition)¶ Merge data from the source DataFrame based on the given merge condition. This returns a
DeltaMergeBuilder
object that can be used to specify the update, delete, or insert actions to be performed on rows based on whether the rows matched the condition or not. SeeDeltaMergeBuilder
for a full description of this operation and what combinations of update, delete and insert operations are allowed.Example 1 with conditions and update expressions as SQL formatted string:
deltaTable.alias("events").merge( source = updatesDF.alias("updates"), condition = "events.eventId = updates.eventId" ).whenMatchedUpdate(set = { "data": "updates.data", "count": "events.count + 1" } ).whenNotMatchedInsert(values = { "date": "updates.date", "eventId": "updates.eventId", "data": "updates.data", "count": "1" } ).execute()
Example 2 with conditions and update expressions as Spark SQL functions:
from pyspark.sql.functions import * deltaTable.alias("events").merge( source = updatesDF.alias("updates"), condition = expr("events.eventId = updates.eventId") ).whenMatchedUpdate(set = { "data" : col("updates.data"), "count": col("events.count") + 1 } ).whenNotMatchedInsert(values = { "date": col("updates.date"), "eventId": col("updates.eventId"), "data": col("updates.data"), "count": lit("1") } ).execute()
Parameters: - source (pyspark.sql.DataFrame) – Source DataFrame
- condition (str or pyspark.sql.Column) – Condition to match sources rows with the Delta table rows.
Returns: builder object to specify whether to update, delete or insert rows based on whether the condition matched or not
Return type: Note
Evolving
New in version 0.4.
-
vacuum
(retentionHours=None)¶ Recursively delete files and directories in the table that are not needed by the table for maintaining older versions up to the given retention threshold. This method will return an empty DataFrame on successful completion.
Example:
deltaTable.vacuum() # vacuum files not required by versions more than 7 days old deltaTable.vacuum(100) # vacuum files not required by versions more than 100 hours old
Parameters: retentionHours – Optional number of hours retain history. If not specified, then the default retention period of 168 hours (7 days) will be used. Note
Evolving
New in version 0.4.
-
history
(limit=None)¶ Get the information of the latest limit commits on this table as a Spark DataFrame. The information is in reverse chronological order.
Example:
fullHistoryDF = deltaTable.history() # get the full history of the table lastOperationDF = deltaTable.history(1) # get the last operation
Parameters: limit – Optional, number of latest commits to returns in the history. Returns: Table’s commit history. See the online Delta Lake documentation for more details. Return type: pyspark.sql.DataFrame Note
Evolving
New in version 0.4.
-
classmethod
convertToDelta
(sparkSession, identifier, partitionSchema=None)¶ Create a DeltaTable from the given parquet table. Takes an existing parquet table and constructs a delta transaction log in the base path of the table. Note: Any changes to the table during the conversion process may not result in a consistent state at the end of the conversion. Users should stop any changes to the table before the conversion is started.
Example:
# Convert unpartitioned parquet table at path 'path/to/table' deltaTable = DeltaTable.convertToDelta( spark, "parquet.`path/to/table`") # Convert partitioned parquet table at path 'path/to/table' and partitioned by # integer column named 'part' partitionedDeltaTable = DeltaTable.convertToDelta( spark, "parquet.`path/to/table`", "part int")
Parameters: - sparkSession (pyspark.sql.SparkSession) – SparkSession to use for the conversion
- identifier (str) – Parquet table identifier formatted as “parquet.`path`”
- partitionSchema –
- partitionSchema – Hive DDL formatted string, or pyspark.sql.types.StructType
Returns: DeltaTable representing the converted Delta table
Return type: Note
Evolving
New in version 0.4.
-
classmethod
forPath
(sparkSession, path)¶ Create a DeltaTable for the data at the given path using the given SparkSession.
Parameters: sparkSession (pyspark.sql.SparkSession) – SparkSession to use for loading the table Returns: loaded Delta table Return type: DeltaTable
Example:
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
Note
Evolving
New in version 0.4.
-
classmethod
isDeltaTable
(sparkSession, identifier)¶ Check if the provided identifier string, in this case a file path, is the root of a Delta table using the given SparkSession.
Parameters: - sparkSession – SparkSession to use to perform the check
- path – location of the table
Returns: If the table is a delta table or not
Return type: bool
Example:
DeltaTable.isDeltaTable(spark, "/path/to/table")
Note
Evolving
New in version 0.4.
-
-
class
delta.tables.
DeltaMergeBuilder
(spark, jbuilder)¶ Builder to specify how to merge data from source DataFrame into the target Delta table. Use
delta.tables.DeltaTable.merge()
to create an object of this class. Using this builder, you can specify 1, 2 or 3when
clauses of which there can be at most 2whenMatched
clauses and at most 1whenNotMatched
clause. Here are the constraints on these clauses.Constraints in the
whenMatched
clauses:There can be at most one
update
action and onedelete
action in whenMatched clauses.Each
whenMatched
clause can have an optional condition. However, if there are twowhenMatched
clauses, then the first one must have a condition.When there are two
whenMatched
clauses and there are conditions (or the lack of) such that a row matches both clauses, then the first clause/action is executed. In other words, the order of thewhenMatched
clauses matter.If none of the
whenMatched
clauses match a source-target row pair that satisfy the merge condition, then the target rows will not be updated or deleted.If you want to update all the columns of the target Delta table with the corresponding column of the source DataFrame, then you can use the
whenMatchedUpdateAll()
. This is equivalent to:whenMatchedUpdate(set = { "col1": "source.col1", "col2": "source.col2", ... # for all columns in the delta table })
Constraints in the
whenNotMatched
clauses:This clause can have only an
insert
action, which can have an optional condition.If
whenNotMatchedInsert
is not present or if it is present but the non-matching source row does not satisfy the condition, then the source row is not inserted.If you want to insert all the columns of the target Delta table with the corresponding column of the source DataFrame, then you can use
whenNotMatchedInsertAll()
. This is equivalent to:whenMatchedInsert(values = { "col1": "source.col1", "col2": "source.col2", ... # for all columns in the delta table })
Example 1 with conditions and update expressions as SQL formatted string:
deltaTable.alias("events").merge( source = updatesDF.alias("updates"), condition = "events.eventId = updates.eventId" ).whenMatchedUpdate(set = { "data": "updates.data", "count": "events.count + 1" } ).whenNotMatchedInsert(values = { "date": "updates.date", "eventId": "updates.eventId", "data": "updates.data", "count": "1" } ).execute()
Example 2 with conditions and update expressions as Spark SQL functions:
from pyspark.sql.functions import * deltaTable.alias("events").merge( source = updatesDF.alias("updates"), condition = expr("events.eventId = updates.eventId") ).whenMatchedUpdate(set = { "data" : col("updates.data"), "count": col("events.count") + 1 } ).whenNotMatchedInsert(values = { "date": col("updates.date"), "eventId": col("updates.eventId"), "data": col("updates.data"), "count": lit("1") } ).execute()
New in version 0.4.
Note
Evolving
-
whenMatchedUpdate
(condition=None, set=None)¶ Update a matched table row based on the rules defined by
set
. If acondition
is specified, then it must evaluate to true for the row to be updated.See
DeltaMergeBuilder
for complete usage details.Parameters: - condition (str or pyspark.sql.Column) – Optional condition of the update
- set (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.
Returns: this builder
Note
Evolving
New in version 0.4.
-
whenMatchedUpdateAll
(condition=None)¶ Update all the columns of the matched table row with the values of the corresponding columns in the source row. If a
condition
is specified, then it must be true for the new row to be updated.See
DeltaMergeBuilder
for complete usage details.Parameters: condition (str or pyspark.sql.Column) – Optional condition of the insert Returns: this builder Note
Evolving
New in version 0.4.
-
whenMatchedDelete
(condition=None)¶ Delete a matched row from the table only if the given
condition
(if specified) is true for the matched row.See
DeltaMergeBuilder
for complete usage details.Parameters: condition (str or pyspark.sql.Column) – Optional condition of the delete Returns: this builder Note
Evolving
New in version 0.4.
-
whenNotMatchedInsert
(condition=None, values=None)¶ Insert a new row to the target table based on the rules defined by
values
. If acondition
is specified, then it must evaluate to true for the new row to be inserted.See
DeltaMergeBuilder
for complete usage details.Parameters: - condition (str or pyspark.sql.Column) – Optional condition of the insert
- values (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.
Returns: this builder
Note
Evolving
New in version 0.4.
-
whenNotMatchedInsertAll
(condition=None)¶ Insert a new target Delta table row by assigning the target columns to the values of the corresponding columns in the source row. If a
condition
is specified, then it must evaluate to true for the new row to be inserted.See
DeltaMergeBuilder
for complete usage details.Parameters: condition (str or pyspark.sql.Column) – Optional condition of the insert Returns: this builder Note
Evolving
New in version 0.4.
-
execute
()¶ Execute the merge operation based on the built matched and not matched actions.
See
DeltaMergeBuilder
for complete usage details.Note
Evolving
New in version 0.4.