Welcome to Delta Lake’s Python documentation page

DeltaTable

class delta.tables.DeltaTable(spark: <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>, jdt: JavaObject)

Main class for programmatically interacting with Delta tables. You can create DeltaTable instances using the path of the Delta table.:

deltaTable = DeltaTable.forPath(spark, "/path/to/table")

In addition, you can convert an existing Parquet table in place into a Delta table.:

deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

New in version 0.4.

toDF() → <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230413d90>

Get a DataFrame representation of this Delta table.

alias(aliasName: str) → delta.tables.DeltaTable

Apply an alias to the Delta table.

generate(mode: str) → None

Generate manifest files for the given delta table.

Parameters:mode

mode for the type of manifest file to be generated The valid modes are as follows (not case sensitive):

  • ”symlink_format_manifest”: This will generate manifests in symlink format
    for Presto and Athena read support.

See the online documentation for more information.

delete(condition: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>, None] = None) → None

Delete data from the table that match the given condition.

Example:

deltaTable.delete("date < '2017-01-01'")        # predicate using SQL formatted string

deltaTable.delete(col("date") < "2017-01-01")   # predicate using Spark SQL functions
Parameters:condition (str or pyspark.sql.Column) – condition of the update
update(condition: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>, None] = None, set: Optional[Dict[str, Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>]]] = None) → None

Update data from the table on the rows that match the given condition, which performs the rules defined by set.

Example:

# condition using SQL formatted string
deltaTable.update(
    condition = "eventType = 'clck'",
    set = { "eventType": "'click'" } )

# condition using Spark SQL functions
deltaTable.update(
    condition = col("eventType") == "clck",
    set = { "eventType": lit("click") } )
Parameters:
  • condition (str or pyspark.sql.Column) – Optional condition of the update
  • set (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.

New in version 0.4.

merge(source: <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230413d90>, condition: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>]) → delta.tables.DeltaMergeBuilder

Merge data from the source DataFrame based on the given merge condition. This returns a DeltaMergeBuilder object that can be used to specify the update, delete, or insert actions to be performed on rows based on whether the rows matched the condition or not. See DeltaMergeBuilder for a full description of this operation and what combinations of update, delete and insert operations are allowed.

Example 1 with conditions and update expressions as SQL formatted string:

deltaTable.alias("events").merge(
    source = updatesDF.alias("updates"),
    condition = "events.eventId = updates.eventId"
  ).whenMatchedUpdate(set =
    {
      "data": "updates.data",
      "count": "events.count + 1"
    }
  ).whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data",
      "count": "1"
    }
  ).execute()

Example 2 with conditions and update expressions as Spark SQL functions:

from pyspark.sql.functions import *

deltaTable.alias("events").merge(
    source = updatesDF.alias("updates"),
    condition = expr("events.eventId = updates.eventId")
  ).whenMatchedUpdate(set =
    {
      "data" : col("updates.data"),
      "count": col("events.count") + 1
    }
  ).whenNotMatchedInsert(values =
    {
      "date": col("updates.date"),
      "eventId": col("updates.eventId"),
      "data": col("updates.data"),
      "count": lit("1")
    }
  ).execute()
Parameters:
  • source (pyspark.sql.DataFrame) – Source DataFrame
  • condition (str or pyspark.sql.Column) – Condition to match sources rows with the Delta table rows.
Returns:

builder object to specify whether to update, delete or insert rows based on whether the condition matched or not

Return type:

delta.tables.DeltaMergeBuilder

vacuum(retentionHours: Optional[float] = None) → <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230413d90>

Recursively delete files and directories in the table that are not needed by the table for maintaining older versions up to the given retention threshold. This method will return an empty DataFrame on successful completion.

Example:

deltaTable.vacuum()     # vacuum files not required by versions more than 7 days old

deltaTable.vacuum(100)  # vacuum files not required by versions more than 100 hours old
Parameters:retentionHours – Optional number of hours retain history. If not specified, then the default retention period of 168 hours (7 days) will be used.
history(limit: Optional[int] = None) → <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230413d90>

Get the information of the latest limit commits on this table as a Spark DataFrame. The information is in reverse chronological order.

Example:

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation
Parameters:limit – Optional, number of latest commits to returns in the history.
Returns:Table’s commit history. See the online Delta Lake documentation for more details.
Return type:pyspark.sql.DataFrame
classmethod convertToDelta(sparkSession: <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>, identifier: str, partitionSchema: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa23041d090>, None] = None) → delta.tables.DeltaTable

Create a DeltaTable from the given parquet table. Takes an existing parquet table and constructs a delta transaction log in the base path of the table. Note: Any changes to the table during the conversion process may not result in a consistent state at the end of the conversion. Users should stop any changes to the table before the conversion is started.

Example:

# Convert unpartitioned parquet table at path 'path/to/table'
deltaTable = DeltaTable.convertToDelta(
    spark, "parquet.`path/to/table`")

# Convert partitioned parquet table at path 'path/to/table' and partitioned by
# integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(
    spark, "parquet.`path/to/table`", "part int")
Parameters:
  • sparkSession (pyspark.sql.SparkSession) – SparkSession to use for the conversion
  • identifier (str) – Parquet table identifier formatted as “parquet.`path`”
  • partitionSchema – Hive DDL formatted string, or pyspark.sql.types.StructType
Returns:

DeltaTable representing the converted Delta table

Return type:

DeltaTable

classmethod forPath(sparkSession: <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>, path: str) → delta.tables.DeltaTable

Create a DeltaTable for the data at the given path using the given SparkSession.

Parameters:sparkSession (pyspark.sql.SparkSession) – SparkSession to use for loading the table
Returns:loaded Delta table
Return type:DeltaTable

Example:

deltaTable = DeltaTable.forPath(spark, "/path/to/table")
classmethod forName(sparkSession: <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>, tableOrViewName: str) → delta.tables.DeltaTable

Create a DeltaTable using the given table or view name using the given SparkSession.

Parameters:
  • sparkSession – SparkSession to use for loading the table
  • tableOrViewName – name of the table or view
Returns:

loaded Delta table

Return type:

DeltaTable

Example:

deltaTable = DeltaTable.forName(spark, "tblName")
classmethod create(sparkSession: Optional[<sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>] = None) → delta.tables.DeltaTableBuilder

Return DeltaTableBuilder object that can be used to specify the table name, location, columns, partitioning columns, table comment, and table properties to create a Delta table, error if the table exists (the same as SQL CREATE TABLE).

See DeltaTableBuilder for a full description and examples of this operation.

Parameters:sparkSession – SparkSession to use for creating the table
Returns:an instance of DeltaTableBuilder
Return type:DeltaTableBuilder

Note

Evolving

classmethod createIfNotExists(sparkSession: Optional[<sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>] = None) → delta.tables.DeltaTableBuilder

Return DeltaTableBuilder object that can be used to specify the table name, location, columns, partitioning columns, table comment, and table properties to create a Delta table, if it does not exists (the same as SQL CREATE TABLE IF NOT EXISTS).

See DeltaTableBuilder for a full description and examples of this operation.

Parameters:sparkSession – SparkSession to use for creating the table
Returns:an instance of DeltaTableBuilder
Return type:DeltaTableBuilder

Note

Evolving

classmethod replace(sparkSession: Optional[<sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>] = None) → delta.tables.DeltaTableBuilder

Return DeltaTableBuilder object that can be used to specify the table name, location, columns, partitioning columns, table comment, and table properties to replace a Delta table, error if the table doesn’t exist (the same as SQL REPLACE TABLE).

See DeltaTableBuilder for a full description and examples of this operation.

Parameters:sparkSession – SparkSession to use for creating the table
Returns:an instance of DeltaTableBuilder
Return type:DeltaTableBuilder

Note

Evolving

classmethod createOrReplace(sparkSession: Optional[<sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>] = None) → delta.tables.DeltaTableBuilder

Return DeltaTableBuilder object that can be used to specify the table name, location, columns, partitioning columns, table comment, and table properties replace a Delta table, error if the table doesn’t exist (the same as SQL REPLACE TABLE).

See DeltaTableBuilder for a full description and examples of this operation.

Parameters:sparkSession – SparkSession to use for creating the table
Returns:an instance of DeltaTableBuilder
Return type:DeltaTableBuilder

Note

Evolving

classmethod isDeltaTable(sparkSession: <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>, identifier: str) → bool

Check if the provided identifier string, in this case a file path, is the root of a Delta table using the given SparkSession.

Parameters:
  • sparkSession – SparkSession to use to perform the check
  • path – location of the table
Returns:

If the table is a delta table or not

Return type:

bool

Example:

DeltaTable.isDeltaTable(spark, "/path/to/table")
upgradeTableProtocol(readerVersion: int, writerVersion: int) → None

Updates the protocol version of the table to leverage new features. Upgrading the reader version will prevent all clients that have an older version of Delta Lake from accessing this table. Upgrading the writer version will prevent older versions of Delta Lake to write to this table. The reader or writer version cannot be downgraded.

See online documentation and Delta’s protocol specification at PROTOCOL.md for more details.

class delta.tables.DeltaMergeBuilder(spark: <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>, jbuilder: JavaObject)

Builder to specify how to merge data from source DataFrame into the target Delta table. Use delta.tables.DeltaTable.merge() to create an object of this class. Using this builder, you can specify 1, 2 or 3 when clauses of which there can be at most 2 whenMatched clauses and at most 1 whenNotMatched clause. Here are the constraints on these clauses.

  • Constraints in the whenMatched clauses:

    • There can be at most one update action and one delete action in whenMatched clauses.

    • Each whenMatched clause can have an optional condition. However, if there are two whenMatched clauses, then the first one must have a condition.

    • When there are two whenMatched clauses and there are conditions (or the lack of) such that a row matches both clauses, then the first clause/action is executed. In other words, the order of the whenMatched clauses matter.

    • If none of the whenMatched clauses match a source-target row pair that satisfy the merge condition, then the target rows will not be updated or deleted.

    • If you want to update all the columns of the target Delta table with the corresponding column of the source DataFrame, then you can use the whenMatchedUpdateAll(). This is equivalent to:

      whenMatchedUpdate(set = {
        "col1": "source.col1",
        "col2": "source.col2",
        ...    # for all columns in the delta table
      })
      
  • Constraints in the whenNotMatched clauses:

    • This clause can have only an insert action, which can have an optional condition.

    • If whenNotMatchedInsert is not present or if it is present but the non-matching source row does not satisfy the condition, then the source row is not inserted.

    • If you want to insert all the columns of the target Delta table with the corresponding column of the source DataFrame, then you can use whenNotMatchedInsertAll(). This is equivalent to:

      whenNotMatchedInsert(values = {
        "col1": "source.col1",
        "col2": "source.col2",
        ...    # for all columns in the delta table
      })
      

Example 1 with conditions and update expressions as SQL formatted string:

deltaTable.alias("events").merge(
    source = updatesDF.alias("updates"),
    condition = "events.eventId = updates.eventId"
  ).whenMatchedUpdate(set =
    {
      "data": "updates.data",
      "count": "events.count + 1"
    }
  ).whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data",
      "count": "1"
    }
  ).execute()

Example 2 with conditions and update expressions as Spark SQL functions:

from pyspark.sql.functions import *

deltaTable.alias("events").merge(
    source = updatesDF.alias("updates"),
    condition = expr("events.eventId = updates.eventId")
  ).whenMatchedUpdate(set =
    {
      "data" : col("updates.data"),
      "count": col("events.count") + 1
    }
  ).whenNotMatchedInsert(values =
    {
      "date": col("updates.date"),
      "eventId": col("updates.eventId"),
      "data": col("updates.data"),
      "count": lit("1")
    }
  ).execute()

New in version 0.4.

whenMatchedUpdate(condition: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>, None] = None, set: Optional[Dict[str, Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>]]] = None) → delta.tables.DeltaMergeBuilder

Update a matched table row based on the rules defined by set. If a condition is specified, then it must evaluate to true for the row to be updated.

See DeltaMergeBuilder for complete usage details.

Parameters:
  • condition (str or pyspark.sql.Column) – Optional condition of the update
  • set (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.
Returns:

this builder

New in version 0.4.

whenMatchedUpdateAll(condition: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>, None] = None) → delta.tables.DeltaMergeBuilder

Update all the columns of the matched table row with the values of the corresponding columns in the source row. If a condition is specified, then it must be true for the new row to be updated.

See DeltaMergeBuilder for complete usage details.

Parameters:condition (str or pyspark.sql.Column) – Optional condition of the insert
Returns:this builder
whenMatchedDelete(condition: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>, None] = None) → delta.tables.DeltaMergeBuilder

Delete a matched row from the table only if the given condition (if specified) is true for the matched row.

See DeltaMergeBuilder for complete usage details.

Parameters:condition (str or pyspark.sql.Column) – Optional condition of the delete
Returns:this builder
whenNotMatchedInsert(condition: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>, None] = None, values: Optional[Dict[str, Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>]]] = None) → delta.tables.DeltaMergeBuilder

Insert a new row to the target table based on the rules defined by values. If a condition is specified, then it must evaluate to true for the new row to be inserted.

See DeltaMergeBuilder for complete usage details.

Parameters:
  • condition (str or pyspark.sql.Column) – Optional condition of the insert
  • values (dict with str as keys and str or pyspark.sql.Column as values) – Defines the rules of setting the values of columns that need to be updated. Note: This param is required. Default value None is present to allow positional args in same order across languages.
Returns:

this builder

New in version 0.4.

whenNotMatchedInsertAll(condition: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417090>, None] = None) → delta.tables.DeltaMergeBuilder

Insert a new target Delta table row by assigning the target columns to the values of the corresponding columns in the source row. If a condition is specified, then it must evaluate to true for the new row to be inserted.

See DeltaMergeBuilder for complete usage details.

Parameters:condition (str or pyspark.sql.Column) – Optional condition of the insert
Returns:this builder
execute() → None

Execute the merge operation based on the built matched and not matched actions.

See DeltaMergeBuilder for complete usage details.

class delta.tables.DeltaTableBuilder(spark: <sphinx.ext.autodoc.importer._MockObject object at 0x7fa230417f90>, jbuilder: JavaObject)

Builder to specify how to create / replace a Delta table. You must specify the table name or the path before executing the builder. You can specify the table columns, the partitioning columns, the location of the data, the table comment and the property, and how you want to create / replace the Delta table.

After executing the builder, a DeltaTable object is returned.

Use delta.tables.DeltaTable.create(), delta.tables.DeltaTable.createIfNotExists(), delta.tables.DeltaTable.replace(), delta.tables.DeltaTable.createOrReplace() to create an object of this class.

Example 1 to create a Delta table with separate columns, using the table name:

deltaTable = DeltaTable.create(sparkSession)
    .tableName("testTable")
    .addColumn("c1", dataType = "INT", nullable = False)
    .addColumn("c2", dataType = IntegerType(), generatedAlwaysAs = "c1 + 1")
    .partitionedBy("c1")
    .execute()

Example 2 to replace a Delta table with existing columns, using the location:

df = spark.createDataFrame([('a', 1), ('b', 2), ('c', 3)], ["key", "value"])

deltaTable = DeltaTable.replace(sparkSession)
    .tableName("testTable")
    .addColumns(df.schema)
    .execute()

New in version 1.0.

Note

Evolving

tableName(identifier: str) → delta.tables.DeltaTableBuilder

Specify the table name. Optionally qualified with a database name [database_name.] table_name.

Parameters:identifier (str) – the table name
Returns:this builder

Note

Evolving

location(location: str) → delta.tables.DeltaTableBuilder

Specify the path to the directory where table data is stored, which could be a path on distributed storage.

Parameters:location (str) – the data stored location
Returns:this builder

Note

Evolving

comment(comment: str) → delta.tables.DeltaTableBuilder

Comment to describe the table.

Parameters:comment (str) – the table comment
Returns:this builder

Note

Evolving

addColumn(colName: str, dataType: Union[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fa23041d050>], nullable: bool = True, generatedAlwaysAs: Optional[str] = None, comment: Optional[str] = None) → delta.tables.DeltaTableBuilder

Specify a column in the table

Parameters:
  • colName (str) – the column name
  • dataType (str or pyspark.sql.types.DataType) – the column data type
  • nullable (bool) – whether column is nullable
  • generatedAlwaysAs (str) – a SQL expression if the column is always generated as a function of other columns. See online documentation for details on Generated Columns.
  • comment (str) – the column comment
Returns:

this builder

Note

Evolving

addColumns(cols: Union[<sphinx.ext.autodoc.importer._MockObject object at 0x7fa23041d090>, List[<sphinx.ext.autodoc.importer._MockObject object at 0x7fa23041d190>]]) → delta.tables.DeltaTableBuilder

Specify columns in the table using an existing schema

Parameters:cols (pyspark.sql.types.StructType or a list of pyspark.sql.types.StructType.) – the columns in the existing schema
Returns:this builder

Note

Evolving

partitionedBy(*cols) → delta.tables.DeltaTableBuilder

Specify columns for partitioning

Parameters:cols (str or list name of columns) – the partitioning cols
Returns:this builder

Note

Evolving

property(key: str, value: str) → delta.tables.DeltaTableBuilder

Specify a table property

Parameters:key – the table property key
Returns:this builder

Note

Evolving

execute() → delta.tables.DeltaTable

Execute Table Creation.

Return type:DeltaTable

Note

Evolving

Exceptions

class delta.exceptions.DeltaConcurrentModificationException(*args, **kwargs)

The basic class for all Delta commit conflict exceptions.

New in version 1.0.

Note

Evolving

class delta.exceptions.ConcurrentWriteException(*args, **kwargs)

Thrown when a concurrent transaction has written data after the current transaction read the table.

New in version 1.0.

Note

Evolving

class delta.exceptions.MetadataChangedException(*args, **kwargs)

Thrown when the metadata of the Delta table has changed between the time of read and the time of commit.

New in version 1.0.

Note

Evolving

class delta.exceptions.ProtocolChangedException(*args, **kwargs)

Thrown when the protocol version has changed between the time of read and the time of commit.

New in version 1.0.

Note

Evolving

class delta.exceptions.ConcurrentAppendException(*args, **kwargs)

Thrown when files are added that would have been read by the current transaction.

New in version 1.0.

Note

Evolving

class delta.exceptions.ConcurrentDeleteReadException(*args, **kwargs)

Thrown when the current transaction reads data that was deleted by a concurrent transaction.

New in version 1.0.

Note

Evolving

class delta.exceptions.ConcurrentDeleteDeleteException(*args, **kwargs)

Thrown when the current transaction deletes data that was deleted by a concurrent transaction.

New in version 1.0.

Note

Evolving

class delta.exceptions.ConcurrentTransactionException(*args, **kwargs)

Thrown when concurrent transaction both attempt to update the same idempotent transaction.

New in version 1.0.

Note

Evolving

Others

delta.pip_utils.configure_spark_with_delta_pip(spark_session_builder: <sphinx.ext.autodoc.importer._MockObject object at 0x7fa23041ded0>) → <sphinx.ext.autodoc.importer._MockObject object at 0x7fa23041ded0>

Utility function to configure a SparkSession builder such that the generated SparkSession will automatically download the required Delta Lake JARs from Maven. This function is required when you want to

  1. Install Delta Lake locally using pip, and

  2. Execute your Python code using Delta Lake + Pyspark directly, that is, not using spark-submit –packages io.delta:… or pyspark –packages io.delta:….

    builder = SparkSession.builder .master(“local[*]”) .appName(“test”)

    spark = configure_spark_with_delta_pip(builder).getOrCreate()

Parameters:spark_session_builder – SparkSession.Builder object being used to configure and create a SparkSession.
Returns:Updated SparkSession.Builder object

New in version 1.0.

Note

Evolving

Indices and tables