Table Batch Reads and Writes

Delta Lake supports most of the options provided by Apache Spark DataFrame read and write APIs for performing batch reads and writes on tables.

Create a table

Use DataFrameWriter (Scala or Java/Python) to write data into Delta Lake as an atomic operation. At a minimum you must specify the format delta:

df.write.format("delta").save("/delta/events")

Partition data

You can partition data to speed up queries or DML that have predicates involving the partition columns. To partition data when you create a Delta table, specify partition by columns. A common pattern is to partition by date, for example:

Scala

df.write.format("delta").partitionBy("date").save("/delta/events")

Read a table

You can load a Delta table as a DataFrame by specifying a path:

Scala

spark.read.format("delta").load("/delta/events")

Query an older snapshot of a table (time travel)

Delta Lake time travel allows you to query an older snapshot of a Delta table. Time travel has many use cases, including:

  • Re-creating analyses, reports, or outputs (for example, the output of a machine learning model). This could be useful for debugging or auditing, especially in regulated industries.
  • Writing complex temporal queries.
  • Fixing mistakes in your data.
  • Providing snapshot isolation for a set of queries for fast changing tables.

This section describes the supported methods for querying older versions of tables, data retention concerns, and provides examples.

Syntax

There are several ways to query an older version of a Delta table.

DataFrameReader options

DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table.

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

For timestamp_string, only date or timestamp strings are accepted. For example, "2019-01-01" and "2019-01-01'T'00:00:00.000Z".

A common pattern is to use the latest state of the Delta table throughout the execution of a Databricks job to update downstream applications.

Write to a table

Append using DataFrames

Using append mode you can atomically add new data to an existing Delta table:

df.write.format("delta").mode("append").save("/delta/events")

Overwrite using DataFrames

To atomically replace all of the data in a table, you can use overwrite mode:

df.write.format("delta").mode("overwrite").save("/delta/events")

You can selectively overwrite only the data that matches predicates over partition columns. The following command atomically replaces the month of January with the data in df:

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
  .save("/delta/events")

This sample code writes out the data in df, validates that it all falls within the specified partitions, and performs an atomic replacement.

Note

Unlike the file APIs in Apache Spark, Delta Lake remembers and enforces the schema of a table. This means that by default overwrites do not replace the schema of an existing table.

For Delta Lake support for updating tables, see Update a table.

Schema validation

Delta Lake automatically validates that the schema of the DataFrame being written is compatible with the schema of the table. Delta Lake uses the following rules to determine whether a write from a DataFrame to a table is compatible:

  • All DataFrame columns must exist in the target table. If there are columns in the DataFrame not present in the table, an exception is raised. Columns present in the table but not in the DataFrame are set to null.
  • DataFrame column data types must match the column data types in the target table. If they don’t match, an exception is raised.
  • DataFrame column names cannot differ only by case. This means that you cannot have columns such as “Foo” and “foo” defined in the same table. While you can use Spark in case sensitive or insensitive (default) mode, Parquet is case sensitive when storing and returning column information. Delta Lake is case-preserving but insensitive when storing the schema and has this restriction to avoid potential mistakes, data corruption, or loss issues.

If you specify other options, such as partitionBy, in combination with append mode, Delta Lake validates that they match and throws an error for any mismatch. When partitionBy is not present, appends automatically follow the partitioning of the existing data.

Automatic schema update

Delta Lake can automatically update the schema of a table as part of a DML transaction (either appending or overwriting), and make the schema compatible with the data being written.

Add columns

Columns that are present in the DataFrame but missing from the table are automatically added as part of a write transaction when:

  • write or writeStream have .option("mergeSchema", "true")

The added columns are appended to the end of the struct they are present in. Case is preserved when appending a new column.

NullType columns

Because Parquet doesn’t support NullType, NullType columns are dropped from the DataFrame when writing into Delta tables, but are still stored in the schema. When a different data type is received for that column, Delta Lake merges the schema to the new data type. If Delta Lake receives a NullType for an existing column, the old schema is retained and the new column is dropped during the write.

NullType in streaming is not supported. Since you must set schemas when using streaming this should be very rare. NullType is also not accepted for complex types such as ArrayType and MapType.

Replace table schema

By default, overwriting the data in a table does not overwrite the schema. When overwriting a table using mode("overwrite") without replaceWhere, you may still want to overwrite the schema of the data being written. You replace the schema and partitioning of the table by setting the overwriteSchema option to true:

df.write.option("overwriteSchema", "true")

Views on tables

Delta Lake supports the creation of views on top of Delta tables just like you might with a data source table.

The core challenge when you operate with views is resolving the schemas. If you alter a Delta table schema, you must recreate derivative views to account for any additions to the schema. For instance, if you add a new column to a Delta table, you must make sure that this column is available in the appropriate views built on top of that base table.